Changeset 21965
- Timestamp:
- 10/23/08 17:05:18 (5 years ago)
- Location:
- lang/c/mpio/trunk
- Files:
-
- 5 modified
-
mp/fdnotify_impl.h (modified) (2 diffs)
-
mp/iothreads/reader.pre.h (modified) (8 diffs)
-
src/iothreads_connector.cc (modified) (2 diffs)
-
src/iothreads_listener.cc (modified) (2 diffs)
-
src/iothreads_reader.cc (modified) (12 diffs)
Legend:
- Unmodified
- Added
- Removed
-
lang/c/mpio/trunk/mp/fdnotify_impl.h
r21348 r21965 64 64 inline void fdnotify<NotifyObject>::send(const NotifyObject& obj) 65 65 { 66 ssize_t len; 67 while(1) { 68 len = ::write(m_pipe[1], (void*)&obj, sizeof(NotifyObject)); 69 if(len >= static_cast<ssize_t>(sizeof(NotifyObject))) { 70 return; 71 } 66 size_t offset = 0; 67 do { 68 ssize_t len = ::write(m_pipe[1], ((const char*)&obj)+offset, 69 sizeof(NotifyObject) - offset); 72 70 if(len < 0) { 73 71 if(errno != EAGAIN && errno != EINTR) { … … 77 75 throw fdnotify_error(errno, "write-side pipe is closed"); 78 76 } 79 } 77 offset += len; 78 } while(offset < sizeof(NotifyObject)); 80 79 } 81 80 -
lang/c/mpio/trunk/mp/iothreads/reader.pre.h
r21242 r21965 57 57 template <typename Handler> 58 58 static void add(int fd); 59 60 59 MP_ARGS_BEGIN 61 60 template <typename Handler, MP_ARGS_TEMPLATE> … … 66 65 67 66 typedef function<void (handler&)> message_t; 68 static void send_message(int fd, message_t* msg); 67 68 template <typename F> 69 static void send_message(int fd, F f); 70 MP_ARGS_BEGIN 71 template <typename F, MP_ARGS_TEMPLATE> 72 static void send_message(int fd, F f, MP_ARGS_PARAMS); 73 MP_ARGS_END 69 74 70 75 private: … … 86 91 87 92 public: 88 template <typename Handler> 89 void add(int fd); 90 91 MP_ARGS_BEGIN 92 template <typename Handler, MP_ARGS_TEMPLATE> 93 void add(int fd, MP_ARGS_PARAMS); 94 MP_ARGS_END 95 93 void add(handler* newh); 96 94 void close(int fd); 97 98 void send_message(int fd, message_t* msg); 95 void send_message(int fd, message_t* newmsg); 99 96 100 97 private: … … 103 100 104 101 private: 105 void add_impl(handler* h);106 102 worker& worker_of(int fd); 107 103 … … 115 111 inline void reader::add(int fd) 116 112 { 117 instance().add <Handler>(fd);113 instance().add(new Handler(fd)); 118 114 } 119 120 115 MP_ARGS_BEGIN 121 116 template <typename Handler, MP_ARGS_TEMPLATE> 122 117 inline void reader::add(int fd, MP_ARGS_PARAMS) 123 118 { 124 instance().add <Handler, MP_ARGS_TYPES>(fd, MP_ARGS_FUNC);119 instance().add(new Handler(fd, MP_ARGS_FUNC)); 125 120 } 126 121 MP_ARGS_END … … 131 126 } 132 127 133 inline void reader::send_message(int fd, message_t* msg) 128 template <typename F> 129 inline void reader::send_message(int fd, F f) 134 130 { 135 instance().send_message(fd, msg);131 instance().send_message(fd, new message_t(f)); 136 132 } 137 138 139 template <typename Handler> 140 inline void reader::impl::add(int fd) 133 MP_ARGS_BEGIN 134 template <typename F, MP_ARGS_TEMPLATE> 135 inline void reader::send_message(int fd, F f, MP_ARGS_PARAMS) 141 136 { 142 handler* h = NULL; 143 try { h = new Handler(fd); } 144 catch (...) { ::close(fd); } 145 try { add_impl(h); } 146 catch (...) { ::close(fd); delete h; throw; } 147 } 148 149 MP_ARGS_BEGIN 150 template <typename Handler, MP_ARGS_TEMPLATE> 151 inline void reader::impl::add(int fd, MP_ARGS_PARAMS) 152 { 153 handler* h = NULL; 154 try { h = new Handler(fd, MP_ARGS_FUNC); } 155 catch (...) { ::close(fd); } 156 try { add_impl(h); } 157 catch (...) { ::close(fd); delete h; throw; } 137 instance().send_message(fd, new message_t(bind(f, MP_ARGS_FUNC))); 158 138 } 159 139 MP_ARGS_END … … 165 145 reader::add<Handler>(fd); 166 146 } 167 168 147 MP_ARGS_BEGIN 169 148 template <typename Handler, MP_ARGS_TEMPLATE> 170 149 inline void add(int fd, MP_ARGS_PARAMS) 171 150 { 172 reader::add<Handler >(fd, MP_ARGS_FUNC);151 reader::add<Handler, MP_ARGS_TYPES>(fd, MP_ARGS_FUNC); 173 152 } 174 153 MP_ARGS_END … … 179 158 } 180 159 160 template <typename F> 161 inline void send_message(int fd, F f) 162 { 163 reader::send_message<F>(fd, f); 164 } 181 165 MP_ARGS_BEGIN 182 template < MP_ARGS_TEMPLATE>183 inline void send_message(int fd, MP_ARGS_PARAMS)166 template <typename F, MP_ARGS_TEMPLATE> 167 inline void send_message(int fd, F f, MP_ARGS_PARAMS) 184 168 { 185 reader::send_message(fd, 186 new reader::message_t(bind(MP_ARGS_FUNC))); 169 reader::send_message<F, MP_ARGS_TYPES>(fd, f, MP_ARGS_FUNC); 187 170 } 188 171 MP_ARGS_END -
lang/c/mpio/trunk/src/iothreads_connector.cc
r21348 r21965 199 199 int sock = ::socket(req.addr()->sa_family, SOCK_STREAM, 0); 200 200 if(sock < 0) { 201 req.callback(-errno); 201 sock = -errno; if(sock >= 0) { sock = -EINVAL; } 202 req.callback(sock); 202 203 return; 203 204 } … … 205 206 if(::connect(sock, req.addr(), req.addrlen()) < 0) { 206 207 ::close(sock); 207 req.callback(-errno); 208 sock = -errno; if(sock >= 0) { sock = -EINVAL; } 209 req.callback(sock); 208 210 return; 209 211 } -
lang/c/mpio/trunk/src/iothreads_listener.cc
r21348 r21965 187 187 m_ev.remove(lsock, EV_READ); 188 188 ::close(lsock); 189 fd = -errno; if(fd >= 0) { fd = -EINVAL; } 190 req.callback(fd); 189 191 return; 190 192 } … … 192 194 m_ev.remove(lsock, EV_READ); 193 195 ::close(lsock); 196 fd = -errno; if(fd >= 0) { fd = -EINVAL; } 197 req.callback(fd); 194 198 return; 195 199 } -
lang/c/mpio/trunk/src/iothreads_reader.cc
r21348 r21965 43 43 44 44 private: 45 void try_read(handler* h);46 47 private:48 45 typedef enum { 49 46 ADD_HANDLER, 50 CLOSE_HANDLER,47 REMOVE_HANDLER, 51 48 MESSAGE, 52 49 } notify_type_t; … … 56 53 union { 57 54 handler* h; // ADD_HANDLER 58 int fd; // CLOSE_HANDLER55 int fd; // REMOVE_HANDLER 59 56 struct { // MESSAGE 60 message_t* func;57 message_t* msg; 61 58 int fd; 62 59 } message; … … 68 65 69 66 private: 70 void add_handler(handler* h);71 void close_handler(handler* h);72 73 private:74 67 typedef event<handler*> ev_t; 75 68 ev_t m_ev; 69 70 private: 71 void try_read(handler* h); 72 73 void add_handler(handler* newh); 74 void remove_handler(handler* h); 75 void run_on_handler(handler* h, message_t* msg); 76 76 }; 77 77 … … 104 104 } 105 105 106 reader::impl::~impl() {} 107 108 109 void reader::impl::add_impl(handler* h) 110 { 111 worker_of(h->fd()).add(h); 106 reader::impl::~impl() { } // FIXME handlers will leak 107 108 109 void reader::impl::add(handler* newh) 110 try { 111 worker_of(newh->fd()).add(newh); 112 } catch (...) { 113 delete newh; 114 throw; 112 115 } 113 116 … … 117 120 } 118 121 119 void reader::impl::send_message(int fd, message_t* msg) 120 { 121 worker_of(fd).send_message(fd, msg); 122 void reader::impl::send_message(int fd, message_t* newmsg) 123 try { 124 worker_of(fd).send_message(fd, newmsg); 125 } catch (...) { 126 delete newmsg; 127 throw; 122 128 } 123 129 … … 130 136 reader::impl::worker::worker() 131 137 { 132 if( m_ev.add(m_notify.getfd(), EV_READ ) < 0 ) {138 if( m_ev.add(m_notify.getfd(), EV_READ, (handler*)NULL) < 0 ) { 133 139 throw event_error(errno, "iothreads reader failed to initialize event notifier"); 134 140 } … … 147 153 inline void reader::impl::worker::close(int fd) 148 154 { 149 notify_entry e = { CLOSE_HANDLER };155 notify_entry e = { REMOVE_HANDLER }; 150 156 e.as.fd = fd; 151 157 m_notify.send(e); … … 155 161 { 156 162 notify_entry e = { MESSAGE }; 157 e.as.message. func= msg;163 e.as.message.msg = msg; 158 164 e.as.message.fd = fd; 159 try { 160 m_notify.send(e); 161 } catch (...) { 162 delete msg; 163 throw; 164 } 165 m_notify.send(e); 165 166 } 166 167 … … 182 183 } 183 184 } else { 184 handler* h = m_ev.data(fd); 185 try_read(h); 185 try_read(m_ev.data(fd)); 186 186 } 187 187 } … … 191 191 } 192 192 193 namespace {194 static void close_delete(int fd, handler* h) { ::close(fd); delete h; }195 }196 197 193 inline void reader::impl::worker::try_read(handler* h) 198 { 199 try { 200 h->read_event(); 201 } catch (...) { 202 close_handler(h); 203 } 204 } 194 try { 195 h->read_event(); 196 } catch (...) { 197 remove_handler(h); 198 } 199 205 200 206 201 inline void reader::impl::worker::notify_impl(notify_entry& e) … … 210 205 add_handler(h); 211 206 212 } else if(e.type == CLOSE_HANDLER) {207 } else if(e.type == REMOVE_HANDLER) { 213 208 int fd = e.as.fd; 214 209 if(!m_ev.test(fd)) { return; } 215 close_handler(m_ev.data(fd));210 remove_handler(m_ev.data(fd)); 216 211 217 212 } else { // e.type == MESSAGE 218 213 int fd = e.as.message.fd; 219 std::auto_ptr<message_t> func(e.as.message.func);214 std::auto_ptr<message_t> msg(e.as.message.msg); 220 215 if(!m_ev.test(fd)) { return; } 221 (*func)(*m_ev.data(fd)); 222 } 223 } 224 225 inline void reader::impl::worker::add_handler(handler* h) 226 { 227 try { 228 if( m_ev.add(h->fd(), EV_READ, h) < 0 ) { 229 throw event_error(errno, "iothreads reader failed to add event"); 230 } 231 } catch (...) { 232 ::close(h->fd()); 233 delete h; 234 } 235 } 236 237 inline void reader::impl::worker::close_handler(handler* h) 238 { 216 run_on_handler(m_ev.data(fd), msg.get()); 217 } 218 } 219 220 namespace { 221 static void close_delete_handler(int fd, handler* h) { 222 ::close(fd); 223 delete h; 224 } 225 } 226 227 inline void reader::impl::worker::add_handler(handler* newh) 228 try { 229 if( m_ev.add(newh->fd(), EV_READ, newh) < 0 ) { 230 throw event_error(errno, "iothreads reader failed to add event"); 231 } 232 // FIXME call newh->connected()? 233 } catch (...) { 234 ::close(newh->fd()); 235 delete newh; 236 // FIXME log? 237 } 238 239 void reader::impl::worker::remove_handler(handler* h) 240 try { 239 241 m_ev.remove(h->fd(), EV_READ); 240 //::close(fd); // FIXME close はロジックスレッドで242 //::close(fd); // FIXME close(2)はロジックスレッドで 241 243 // 1. fdに対してイベント到着 242 244 // 2. 直後にclose … … 244 246 // 4. イベントに対して返信するためにsend_data 245 247 // 5. 期待とは異なるfdに返信されてしまう 246 try { 247 iothreads::submit(close_delete, h->fd(), h); 248 } catch (...) { 249 ::close(h->fd()); 250 delete h; 251 } 248 iothreads::submit(close_delete_handler, h->fd(), h); 249 } catch (...) { 250 ::close(h->fd()); 251 delete h; 252 } 253 254 inline void reader::impl::worker::run_on_handler(handler* h, message_t* msg) 255 try { 256 (*msg)(*h); 257 } catch (...) { 258 // FIXME log? 252 259 } 253 260
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)