root/lang/perl/tiarra/trunk/main/Tiarra/Socket/Connect.pm @ 11203

Revision 11203, 10.7 kB (checked in by topia, 6 years ago)

* fix default option handling.
* add documentation about unix domain socket.

  • Property svn:mime-type set to text/x-perl; charset=UTF-8
  • Property svn:eol-style set to LF
  • Property svn:keywords set to Id URL Date Rev Author
Line 
1# -----------------------------------------------------------------------------
2# $Id$
3# -----------------------------------------------------------------------------
4# Socket Connector
5# -----------------------------------------------------------------------------
6# copyright (C) 2004 Topia <topia@clovery.jp>. all rights reserved.
7package Tiarra::Socket::Connect;
8use strict;
9use warnings;
10use Carp;
11use Tiarra::Socket;
12use base qw(Tiarra::Socket);
13use Timer;
14use Tiarra::OptionalModules;
15use Tiarra::Utils;
16utils->define_attr_accessor(0, qw(domain host addr port callback),
17                            qw(bind_addr prefer timeout),
18                            qw(retry_int retry_count try_count));
19utils->define_attr_enum_accessor('domain', 'eq',
20                                 qw(tcp unix));
21
22# now supported tcp, unix
23
24# tcp:
25#   my $connector = connect->new(
26#       host => [hostname],
27#       port => [port],
28#       callback => sub {
29#           my ($genre, $connector, $msg_or_sock, $errno) = @_;
30#           if ($genre eq 'warn') {
31#               # $msg_or_sock: msg
32#               # maybe don't have $errno
33#               warn $msg_or_sock;
34#           } elsif ($genre eq 'error') {
35#               # $msg_or_sock: msg
36#               # maybe has $errno
37#               die $msg_or_sock;
38#           } elsif ($genre eq 'sock') {
39#               # $msg_or_sock: sock
40#               # maybe don't have $errno
41#               attach($connector->current_addr, $connector->current_port,
42#                      $msg_or_sock);
43#           # optional genre
44#           } elsif ($genre eq 'interrupt') {
45#               # $msg_or_sock: undef
46#               # maybe don't have $errno
47#               die 'interrupted';
48#           } elsif ($genre eq 'timeout') {
49#               # $msg_or_sock: undef
50#               # maybe don't have $errno
51#               die 'timeout';
52#           }
53#       },
54#       # optional params
55#       addr => [already resolved addr],
56#       bind_addr => [bind_addr (cannot specify host)],
57#       timeout => [timeout], # didn't test enough, please send report when bugs.
58#       retry_int => [retry interval],
59#       retry_count => [retry count],
60#       prefer => [prefer socket type(and order) (ipv4, ipv6) as string's
61#                  array ref, default ipv6, ipv4],
62#       domain => 'tcp', # default
63#       );
64#   $connector->interrupt;
65# unix:
66#   my $connector = connect->new(
67#       addr => [path],
68#       callback => sub {
69#           # ... same as tcp domain's
70#       },
71#       # optional params
72#       domain => 'unix', # please define this
73#       );
74
75sub new {
76    my ($class, %opts) = @_;
77
78    $class->_increment_caller('socket-connector', \%opts);
79    my $this = $class->SUPER::new(%opts);
80    map {
81        $this->$_($opts{$_});
82    } qw(host addr port callback bind_addr timeout retry_int retry_count);
83
84    if (!defined $this->callback) {
85        croak 'callback closure required';
86    }
87
88    $this->domain(utils->get_first_defined($opts{domain}, 'tcp'));
89    $this->prefer($opts{prefer});
90
91    if (!defined $this->prefer) {
92        if ($this->domain_tcp) {
93            my @prefer;
94            @prefer = ('ipv4');
95            if (Tiarra::OptionalModules->ipv6) {
96                unshift(@prefer, 'ipv6')
97            }
98            $this->prefer(\@prefer);
99        } elsif ($this->domain_unix) {
100            $this->prefer(['unix']);
101        } else {
102            croak 'Unsupported domain: '. $this->domain;
103        }
104    }
105
106    $this->{queue} = [];
107    $this->connect;
108}
109
110sub connect {
111    my $this = shift;
112
113    if (defined $this->timeout) {
114        $this->{timer} = Timer->new(
115            After => $this->timeout,
116            Code => sub {
117                $this->interrupt('timeout');
118            });
119    }
120
121    if (defined $this->addr || $this->domain_unix) {
122        my $entry = Tiarra::Resolver::QueueData->new;
123        $entry->answer_status($entry->ANSWER_OK);
124        $entry->answer_data([$this->addr]);
125        $this->_connect_stage($entry);
126    } else {
127        Tiarra::Resolver->resolve(
128            'addr', $this->host, sub {
129                eval {
130                    $this->_connect_stage(@_);
131                }; if ($@) {
132                    $this->_connect_error("internal error: $@");
133                }
134            });
135    }
136    $this;
137}
138
139sub _connect_stage {
140    my ($this, $entry) = @_;
141
142    my %addrs_by_types;
143
144    if ($entry->answer_status ne $entry->ANSWER_OK) {
145        $this->_connect_error("Couldn't resolve hostname");
146        return undef; # end
147    }
148
149    foreach my $addr (@{$entry->answer_data}) {
150        push (@{$addrs_by_types{$this->probe_type_by_addr($addr)}},
151              $addr);
152    }
153
154    foreach my $sock_type (@{$this->prefer}) {
155        my $struct;
156        push (@{$this->{queue}},
157              map {
158                  $struct = {
159                      type => $sock_type,
160                      addr => $_,
161                      port => $this->port,
162                  };
163              } @{$addrs_by_types{$sock_type}});
164    }
165    $this->_connect_try_next;
166}
167
168sub _connect_try_next {
169    my $this = shift;
170
171    $this->{connecting} = shift @{$this->{queue}};
172    if (defined $this->{connecting}) {
173        my $methodname = '_try_connect_' . $this->{connecting}->{type};
174        $this->$methodname;
175    } else {
176        if ($this->retry_int && (++$this->try_count <= $this->retry_count)) {
177            $this->{timer} = Timer->new(
178                After => $this->retry_int,
179                Code => sub {
180                    $this->cleanup;
181                    $this->connect;
182                });
183            $this->_connect_warn(
184                'all dead, ' .
185                    utils->to_ordinal_number($this->try_count) . ' retry');
186        } else {
187            $this->_connect_error('all dead');
188        }
189    }
190}
191
192sub _try_connect_ipv4 {
193    my $this = shift;
194
195    $this->_try_connect_tcp('IO::Socket::INET');
196}
197
198sub _try_connect_ipv6 {
199    my $this = shift;
200
201    if (!Tiarra::OptionalModules->ipv6) {
202        $this->_error(
203            qq{Host $this->{host} seems to be an IPv6 address, }.
204                qq{but IPv6 support is not enabled. }.
205                    qq{Use IPv4 or install Socket6 or IO::Socket::INET6 if possible.\n});
206    }
207
208    $this->_try_connect_tcp('IO::Socket::INET6');
209}
210
211sub _try_connect_tcp {
212    my ($this, $package, $addr, %additional) = @_;
213
214    if (!eval("require $package")) {
215        $this->_connect_error("Couldn\'t require socket package: $package");
216        return;
217    }
218    my $sock = $package->new(
219        %additional,
220        (defined $this->{bind_addr} ?
221             (LocalAddr => $this->{bind_addr}) : ()),
222        Timeout => undef,
223        Proto => 'tcp');
224    if (!defined $sock) {
225        $this->_connect_error(
226            $this->sock_errno_to_msg($!, 'Couldn\'t prepare socket'),
227            $!);
228        return;
229    }
230    if (!defined $sock->blocking(0)) {
231        # effect only on connecting; comment out
232        #$this->_warn('cannot non-blocking') if ::debug_mode();
233
234        if ($this->_is_winsock) {
235            # winsock FIONBIO
236            my $FIONBIO = 0x8004667e; # from Winsock2.h
237            my $temp = chr(1);
238            my $retval = $sock->ioctl($FIONBIO, $temp);
239            if (!$retval) {
240                $this->_warn($this->sock_errno_to_msg(
241                    $!, 'Couldn\'t set non-blocking mode (winsock2)'), $!);
242            }
243        } else {
244            $this->_warn($this->sock_errno_to_msg(
245                $!, 'Couldn\'t set non-blocking mode (general)'), $!);
246        }
247    }
248    my $saddr = Tiarra::Resolver->resolve(
249        'saddr', [$this->current_addr, $this->current_port],
250        sub {}, 0);
251    $this->{connecting}->{saddr} = $saddr->answer_data;
252    if ($sock->connect($this->{connecting}->{saddr}) ||
253            $!{EINPROGRESS} || $!{EWOULDBLOCK}) {
254        my $error = $!;
255        $this->attach($sock);
256        $! = $error;
257        if ($!{EINPROGRESS} || $!{EWOULDBLOCK}) {
258            $this->install;
259        } else {
260            $this->_call;
261        }
262    } else {
263        $this->_connect_warn_try_next($!, 'connect error');
264    }
265}
266
267sub _try_connect_unix {
268    my $this = shift;
269
270    if (!Tiarra::OptionalModules->unix_dom) {
271        $this->_error(
272            qq{Address $this->{addr} seems to be an Unix Domain Socket address, }.
273                qq{but Unix Domain Socket support is not enabled. }.
274                    qq{Use other protocol if possible.\n});
275    }
276
277    require IO::Socket::UNIX;
278    my $sock = IO::Socket::UNIX->new(Peer => $this->{connecting}->{addr});
279    if (defined $sock) {
280        $this->attach($sock);
281        $this->_call;
282    } else {
283        $this->_connect_warn_try_next($!, 'Couldn\'t connect');
284    }
285}
286
287sub _connect_warn_try_next {
288    my ($this, $errno, $msg) = @_;
289
290    $this->_connect_warn($this->sock_errno_to_msg($errno, $msg), $errno);
291    $this->_connect_try_next;
292}
293
294sub _connect_error { shift->_connect_warn_or_error('error', @_); }
295sub _connect_warn { shift->_connect_warn_or_error('warn', @_); }
296
297sub _connect_warn_or_error {
298    my $this = shift;
299    my $method = '_'.shift;
300    my $str = shift;
301    my $errno = shift; # but optional
302    if (defined $str) {
303        $str = ': ' . $str;
304    } else {
305        $str = '';
306    }
307
308    $this->$method("Couldn't connect to ".$this->destination.$str, $errno, @_);
309}
310
311sub destination {
312    my $this = shift;
313
314    $this->repr_destination(
315        host => $this->host,
316        addr => $this->current_addr,
317        port => $this->current_port,
318        type => $this->current_type);
319}
320
321sub current_addr {
322    my $this = shift;
323
324    utils->get_first_defined(
325        $this->{connecting}->{addr},
326        $this->addr);
327}
328
329sub current_port {
330    my $this = shift;
331
332    utils->get_first_defined(
333        $this->{connecting}->{port},
334        $this->port);
335}
336
337sub current_type {
338    my $this = shift;
339
340    $this->{connecting}->{type};
341}
342
343sub _error {
344    # connection error; and finish ->connect chain
345    my ($this, $msg, $errno) = @_;
346
347    $this->callback->('error', $this, $msg, $errno);
348}
349
350sub _warn {
351    # connection warning; but continue trying
352    my ($this, $msg, $errno) = @_;
353
354    $this->callback->('warn', $this, $msg, $errno);
355}
356
357sub _call {
358    # connection successful
359    my $this = shift;
360
361    $this->callback->('sock', $this, $this->sock);
362}
363
364sub cleanup {
365    my $this = shift;
366
367    if ($this->installed) {
368        $this->uninstall;
369    }
370    if (defined $this->{timer}) {
371        $this->{timer}->uninstall;
372        $this->{timer} = undef;
373    }
374}
375
376sub interrupt {
377    my ($this, $genre) = @_;
378
379    $this->cleanup;
380    if (defined $this->sock) {
381        $this->close;
382    }
383    $genre = 'interrupt' unless defined $genre;
384    $this->callback->($genre, $this);
385}
386
387sub want_to_write {
388    1;
389}
390
391sub write { shift->proc_sock('write') }
392sub read { shift->proc_sock('read') }
393sub exception { shift->_handle_sock_error }
394
395sub proc_sock {
396    my $this = shift;
397    my $state = shift;
398
399    if ($state eq 'write') {
400        my $error = $this->errno;
401        $this->cleanup;
402        if ($error) {
403            $this->close;
404            $this->_connect_warn_try_next($error);
405        } else {
406            $this->_call;
407        }
408    } elsif (!$this->sock->connect($this->{connecting}->{saddr})) {
409        if ($!{EISCONN} ||
410                ($this->_is_winsock && (($! == 10022) || $!{EWOULDBLOCK} ||
411                                            $!{EALREADY}))) {
412            $this->cleanup;
413            $this->_call;
414        } else {
415            $this->_warn(
416                $this->sock_errno_to_msg($!, 'connection try error'), $!);
417            $this->_handle_sock_error;
418        }
419    } elsif (!IO::Select->new($this->sock)->can_write(0)) {
420        $this->_warn('cannot write socket error');
421        my $error = $this->errno;
422        $this->cleanup;
423        $this->close;
424        $this->_connect_warn_try_next($error, "cant write on $state");
425    } else {
426        # ignore first ready-to-read
427        if ($state ne 'read' || $this->{unexpected_want_to_read_count}++) {
428            $this->_warn("connect successful, why called this on $state?");
429        }
430    }
431}
432
433sub _handle_sock_error {
434    my $this = shift;
435
436    my $error = $this->errno;
437    $this->cleanup;
438    $this->close;
439    $this->_connect_warn_try_next($error);
440}
441
4421;
Note: See TracBrowser for help on using the browser.