Changeset 21965

Show
Ignore:
Timestamp:
10/23/08 17:05:18 (5 years ago)
Author:
frsyuki
Message:

lang/c/mpio: fixed mp::fdnotify::send

Location:
lang/c/mpio/trunk
Files:
5 modified

Legend:

Unmodified
Added
Removed
  • lang/c/mpio/trunk/mp/fdnotify_impl.h

    r21348 r21965  
    6464inline void fdnotify<NotifyObject>::send(const NotifyObject& obj) 
    6565{ 
    66         ssize_t len; 
    67         while(1) { 
    68                 len = ::write(m_pipe[1], (void*)&obj, sizeof(NotifyObject)); 
    69                 if(len >= static_cast<ssize_t>(sizeof(NotifyObject))) { 
    70                         return; 
    71                 } 
     66        size_t offset = 0; 
     67        do { 
     68                ssize_t len = ::write(m_pipe[1], ((const char*)&obj)+offset, 
     69                                sizeof(NotifyObject) - offset); 
    7270                if(len < 0) { 
    7371                        if(errno != EAGAIN && errno != EINTR) { 
     
    7775                        throw fdnotify_error(errno, "write-side pipe is closed"); 
    7876                } 
    79         } 
     77                offset += len; 
     78        } while(offset < sizeof(NotifyObject)); 
    8079} 
    8180 
  • lang/c/mpio/trunk/mp/iothreads/reader.pre.h

    r21242 r21965  
    5757        template <typename Handler> 
    5858        static void add(int fd); 
    59  
    6059MP_ARGS_BEGIN 
    6160        template <typename Handler, MP_ARGS_TEMPLATE> 
     
    6665 
    6766        typedef function<void (handler&)> message_t; 
    68         static void send_message(int fd, message_t* msg); 
     67 
     68        template <typename F> 
     69        static void send_message(int fd, F f); 
     70MP_ARGS_BEGIN 
     71        template <typename F, MP_ARGS_TEMPLATE> 
     72        static void send_message(int fd, F f, MP_ARGS_PARAMS); 
     73MP_ARGS_END 
    6974 
    7075private: 
     
    8691 
    8792public: 
    88         template <typename Handler> 
    89         void add(int fd); 
    90  
    91 MP_ARGS_BEGIN 
    92         template <typename Handler, MP_ARGS_TEMPLATE> 
    93         void add(int fd, MP_ARGS_PARAMS); 
    94 MP_ARGS_END 
    95  
     93        void add(handler* newh); 
    9694        void close(int fd); 
    97  
    98         void send_message(int fd, message_t* msg); 
     95        void send_message(int fd, message_t* newmsg); 
    9996 
    10097private: 
     
    103100 
    104101private: 
    105         void add_impl(handler* h); 
    106102        worker& worker_of(int fd); 
    107103 
     
    115111inline void reader::add(int fd) 
    116112{ 
    117         instance().add<Handler>(fd); 
     113        instance().add(new Handler(fd)); 
    118114} 
    119  
    120115MP_ARGS_BEGIN 
    121116template <typename Handler, MP_ARGS_TEMPLATE> 
    122117inline void reader::add(int fd, MP_ARGS_PARAMS) 
    123118{ 
    124         instance().add<Handler, MP_ARGS_TYPES>(fd, MP_ARGS_FUNC); 
     119        instance().add(new Handler(fd, MP_ARGS_FUNC)); 
    125120} 
    126121MP_ARGS_END 
     
    131126} 
    132127 
    133 inline void reader::send_message(int fd, message_t* msg) 
     128template <typename F> 
     129inline void reader::send_message(int fd, F f) 
    134130{ 
    135         instance().send_message(fd, msg); 
     131        instance().send_message(fd, new message_t(f)); 
    136132} 
    137  
    138  
    139 template <typename Handler> 
    140 inline void reader::impl::add(int fd) 
     133MP_ARGS_BEGIN 
     134template <typename F, MP_ARGS_TEMPLATE> 
     135inline void reader::send_message(int fd, F f, MP_ARGS_PARAMS) 
    141136{ 
    142         handler* h = NULL; 
    143         try { h = new Handler(fd); } 
    144         catch (...) { ::close(fd); } 
    145         try { add_impl(h); } 
    146         catch (...) { ::close(fd); delete h; throw; } 
    147 } 
    148  
    149 MP_ARGS_BEGIN 
    150 template <typename Handler, MP_ARGS_TEMPLATE> 
    151 inline void reader::impl::add(int fd, MP_ARGS_PARAMS) 
    152 { 
    153         handler* h = NULL; 
    154         try { h = new Handler(fd, MP_ARGS_FUNC); } 
    155         catch (...) { ::close(fd); } 
    156         try { add_impl(h); } 
    157         catch (...) { ::close(fd); delete h; throw; } 
     137        instance().send_message(fd, new message_t(bind(f, MP_ARGS_FUNC))); 
    158138} 
    159139MP_ARGS_END 
     
    165145        reader::add<Handler>(fd); 
    166146} 
    167  
    168147MP_ARGS_BEGIN 
    169148template <typename Handler, MP_ARGS_TEMPLATE> 
    170149inline void add(int fd, MP_ARGS_PARAMS) 
    171150{ 
    172         reader::add<Handler>(fd, MP_ARGS_FUNC); 
     151        reader::add<Handler, MP_ARGS_TYPES>(fd, MP_ARGS_FUNC); 
    173152} 
    174153MP_ARGS_END 
     
    179158} 
    180159 
     160template <typename F> 
     161inline void send_message(int fd, F f) 
     162{ 
     163        reader::send_message<F>(fd, f); 
     164} 
    181165MP_ARGS_BEGIN 
    182 template <MP_ARGS_TEMPLATE> 
    183 inline void send_message(int fd, MP_ARGS_PARAMS) 
     166template <typename F, MP_ARGS_TEMPLATE> 
     167inline void send_message(int fd, F f, MP_ARGS_PARAMS) 
    184168{ 
    185         reader::send_message(fd, 
    186                         new reader::message_t(bind(MP_ARGS_FUNC))); 
     169        reader::send_message<F, MP_ARGS_TYPES>(fd, f, MP_ARGS_FUNC); 
    187170} 
    188171MP_ARGS_END 
  • lang/c/mpio/trunk/src/iothreads_connector.cc

    r21348 r21965  
    199199        int sock = ::socket(req.addr()->sa_family, SOCK_STREAM, 0); 
    200200        if(sock < 0) { 
    201                 req.callback(-errno); 
     201                sock = -errno;  if(sock >= 0) { sock = -EINVAL; } 
     202                req.callback(sock); 
    202203                return; 
    203204        } 
     
    205206        if(::connect(sock, req.addr(), req.addrlen()) < 0) { 
    206207                ::close(sock); 
    207                 req.callback(-errno); 
     208                sock = -errno;  if(sock >= 0) { sock = -EINVAL; } 
     209                req.callback(sock); 
    208210                return; 
    209211        } 
  • lang/c/mpio/trunk/src/iothreads_listener.cc

    r21348 r21965  
    187187                        m_ev.remove(lsock, EV_READ); 
    188188                        ::close(lsock); 
     189                        fd = -errno;  if(fd >= 0) { fd = -EINVAL; } 
     190                        req.callback(fd); 
    189191                        return; 
    190192                } 
     
    192194                m_ev.remove(lsock, EV_READ); 
    193195                ::close(lsock); 
     196                fd = -errno;  if(fd >= 0) { fd = -EINVAL; } 
     197                req.callback(fd); 
    194198                return; 
    195199        } 
  • lang/c/mpio/trunk/src/iothreads_reader.cc

    r21348 r21965  
    4343 
    4444private: 
    45         void try_read(handler* h); 
    46  
    47 private: 
    4845        typedef enum { 
    4946                ADD_HANDLER, 
    50                 CLOSE_HANDLER, 
     47                REMOVE_HANDLER, 
    5148                MESSAGE, 
    5249        } notify_type_t; 
     
    5653                union { 
    5754                        handler* h;             // ADD_HANDLER 
    58                         int fd;                 // CLOSE_HANDLER 
     55                        int fd;                 // REMOVE_HANDLER 
    5956                        struct {                // MESSAGE 
    60                                 message_t* func; 
     57                                message_t* msg; 
    6158                                int fd; 
    6259                        } message; 
     
    6865 
    6966private: 
    70         void add_handler(handler* h); 
    71         void close_handler(handler* h); 
    72  
    73 private: 
    7467        typedef event<handler*> ev_t; 
    7568        ev_t m_ev; 
     69 
     70private: 
     71        void try_read(handler* h); 
     72 
     73        void add_handler(handler* newh); 
     74        void remove_handler(handler* h); 
     75        void run_on_handler(handler* h, message_t* msg); 
    7676}; 
    7777 
     
    104104} 
    105105 
    106 reader::impl::~impl() {} 
    107  
    108  
    109 void reader::impl::add_impl(handler* h) 
    110 { 
    111         worker_of(h->fd()).add(h); 
     106reader::impl::~impl() { }  // FIXME handlers will leak 
     107 
     108 
     109void reader::impl::add(handler* newh) 
     110try { 
     111        worker_of(newh->fd()).add(newh); 
     112} catch (...) { 
     113        delete newh; 
     114        throw; 
    112115} 
    113116 
     
    117120} 
    118121 
    119 void reader::impl::send_message(int fd, message_t* msg) 
    120 { 
    121         worker_of(fd).send_message(fd, msg); 
     122void reader::impl::send_message(int fd, message_t* newmsg) 
     123try { 
     124        worker_of(fd).send_message(fd, newmsg); 
     125} catch (...) { 
     126        delete newmsg; 
     127        throw; 
    122128} 
    123129 
     
    130136reader::impl::worker::worker() 
    131137{ 
    132         if( m_ev.add(m_notify.getfd(), EV_READ) < 0 ) { 
     138        if( m_ev.add(m_notify.getfd(), EV_READ, (handler*)NULL) < 0 ) { 
    133139                throw event_error(errno, "iothreads reader failed to initialize event notifier"); 
    134140        } 
     
    147153inline void reader::impl::worker::close(int fd) 
    148154{ 
    149         notify_entry e = { CLOSE_HANDLER }; 
     155        notify_entry e = { REMOVE_HANDLER }; 
    150156        e.as.fd = fd; 
    151157        m_notify.send(e); 
     
    155161{ 
    156162        notify_entry e = { MESSAGE }; 
    157         e.as.message.func = msg; 
     163        e.as.message.msg = msg; 
    158164        e.as.message.fd = fd; 
    159         try { 
    160                 m_notify.send(e); 
    161         } catch (...) { 
    162                 delete msg; 
    163                 throw; 
    164         } 
     165        m_notify.send(e); 
    165166} 
    166167 
     
    182183                                } 
    183184                        } else { 
    184                                 handler* h = m_ev.data(fd); 
    185                                 try_read(h); 
     185                                try_read(m_ev.data(fd)); 
    186186                        } 
    187187                } 
     
    191191} 
    192192 
    193 namespace { 
    194 static void close_delete(int fd, handler* h) { ::close(fd); delete h; } 
    195 } 
    196  
    197193inline void reader::impl::worker::try_read(handler* h) 
    198 { 
    199         try { 
    200                 h->read_event(); 
    201         } catch (...) { 
    202                 close_handler(h); 
    203         } 
    204 } 
     194try { 
     195        h->read_event(); 
     196} catch (...) { 
     197        remove_handler(h); 
     198} 
     199 
    205200 
    206201inline void reader::impl::worker::notify_impl(notify_entry& e) 
     
    210205                add_handler(h); 
    211206 
    212         } else if(e.type == CLOSE_HANDLER) { 
     207        } else if(e.type == REMOVE_HANDLER) { 
    213208                int fd = e.as.fd; 
    214209                if(!m_ev.test(fd)) { return; } 
    215                 close_handler(m_ev.data(fd)); 
     210                remove_handler(m_ev.data(fd)); 
    216211 
    217212        } else {  // e.type == MESSAGE 
    218213                int fd = e.as.message.fd; 
    219                 std::auto_ptr<message_t> func(e.as.message.func); 
     214                std::auto_ptr<message_t> msg(e.as.message.msg); 
    220215                if(!m_ev.test(fd)) { return; } 
    221                 (*func)(*m_ev.data(fd)); 
    222         } 
    223 } 
    224  
    225 inline void reader::impl::worker::add_handler(handler* h) 
    226 { 
    227         try { 
    228                 if( m_ev.add(h->fd(), EV_READ, h) < 0 ) { 
    229                         throw event_error(errno, "iothreads reader failed to add event"); 
    230                 } 
    231         } catch (...) { 
    232                 ::close(h->fd()); 
    233                 delete h; 
    234         } 
    235 } 
    236  
    237 inline void reader::impl::worker::close_handler(handler* h) 
    238 { 
     216                run_on_handler(m_ev.data(fd), msg.get()); 
     217        } 
     218} 
     219 
     220namespace { 
     221static void close_delete_handler(int fd, handler* h) { 
     222        ::close(fd); 
     223        delete h; 
     224} 
     225} 
     226 
     227inline void reader::impl::worker::add_handler(handler* newh) 
     228try { 
     229        if( m_ev.add(newh->fd(), EV_READ, newh) < 0 ) { 
     230                throw event_error(errno, "iothreads reader failed to add event"); 
     231        } 
     232        // FIXME call newh->connected()? 
     233} catch (...) { 
     234        ::close(newh->fd()); 
     235        delete newh; 
     236        // FIXME log? 
     237} 
     238 
     239void reader::impl::worker::remove_handler(handler* h) 
     240try { 
    239241        m_ev.remove(h->fd(), EV_READ); 
    240         //::close(fd);  // FIXME closeはロジックスレッドで 
     242        //::close(fd);  // FIXME close(2)はロジックスレッドで 
    241243        // 1. fdに対してイベント到着 
    242244        // 2. 直後にclose 
     
    244246        // 4. イベントに対して返信するためにsend_data 
    245247        // 5. 期待とは異なるfdに返信されてしまう 
    246         try { 
    247                 iothreads::submit(close_delete, h->fd(), h); 
    248         } catch (...) { 
    249                 ::close(h->fd()); 
    250                 delete h; 
    251         } 
     248        iothreads::submit(close_delete_handler, h->fd(), h); 
     249} catch (...) { 
     250        ::close(h->fd()); 
     251        delete h; 
     252} 
     253 
     254inline void reader::impl::worker::run_on_handler(handler* h, message_t* msg) 
     255try { 
     256        (*msg)(*h); 
     257} catch (...) { 
     258        // FIXME log? 
    252259} 
    253260