Changeset 19043
- Timestamp:
- 09/09/08 20:46:02 (5 years ago)
- Location:
- lang/c/mpio/trunk
- Files:
-
- 4 added
- 11 modified
-
mp/iothreads.pre.h (modified) (5 diffs)
-
mp/iothreads/connect.h (modified) (1 diff)
-
mp/iothreads/listen.h (modified) (2 diffs)
-
mp/iothreads/reader.h (modified) (6 diffs)
-
mp/iothreads/timer.h (modified) (2 diffs)
-
mp/iothreads/writer.h (modified) (3 diffs)
-
mp/iothreads_impl.pre.h (modified) (6 diffs)
-
mp/multiplex.pre.h (modified) (1 diff)
-
mp/multiplex_impl.pre.h (modified) (3 diffs)
-
mp/sparse_array_impl.pre.h (modified) (1 diff)
-
mp/utility.pre.h (modified) (2 diffs)
-
src/iothreads.cc (added)
-
src/iothreads_listen.cc (added)
-
src/iothreads_timer.cc (added)
-
src/iothreads_writer.cc (added)
Legend:
- Unmodified
- Added
- Removed
-
lang/c/mpio/trunk/mp/iothreads.pre.h
r18476 r19043 25 25 #include "mp/zone.h" 26 26 #include "mp/callback_message.h" 27 #include "mp/utility.h"27 #include <memory> 28 28 29 29 … … 62 62 static void submit(callback_message* msg); 63 63 64 static void join(); 65 64 66 static void run(); 65 67 static void end(); 66 68 static bool is_end(); 67 68 // FIXME join69 69 70 70 private: … … 78 78 void submit_impl(callback_message* msg); 79 79 void run_impl(); 80 void join_impl(); 80 81 81 82 private: 82 83 blocking_vector<callback_message*> m_messages; 83 source<> m_source; 84 zone<source<> > m_zone; 85 86 typedef std::vector<callback_message*> messages_t; 84 zone<> m_zone; 87 85 88 86 private: 89 87 volatile sig_atomic_t m_end_flag; 88 typedef std::vector<pthread_base*> threads_t; 89 threads_t m_threads; 90 90 91 91 private: 92 typedef std::vector<callback_message*> messages_t; 92 93 void clear_message(messages_t& messages); 93 94 94 95 private: 95 static s coped_ptr<manager> s_instance;96 static std::auto_ptr<manager> s_instance; 96 97 static manager& instance() { return *s_instance; } 97 98 … … 101 102 }; 102 103 103 104 inline void initialize()105 { manager::initialize(); }106 107 inline void destroy()108 { manager::destroy(); }109 104 110 105 inline bool is_end() … … 147 142 #include "mp/iothreads/writer.h" 148 143 #include "mp/iothreads/listen.h" 144 //#include "mp/iothreads/connect.h" 149 145 #include "mp/iothreads/timer.h" 150 146 -
lang/c/mpio/trunk/mp/iothreads/connect.h
r18476 r19043 19 19 #ifndef MP_IOTHREADS_LISTEN_H__ 20 20 #define MP_IOTHREADS_LISTEN_H__ 21 22 #include <memory> 21 23 22 24 #include <unistd.h> -
lang/c/mpio/trunk/mp/iothreads/listen.h
r18476 r19043 20 20 #define MP_IOTHREADS_LISTEN_H__ 21 21 22 #include <unistd.h>23 #include <sys/types.h>24 #include <sys/socket.h>25 #include <errno.h>26 27 22 namespace mp { 28 23 namespace iothreads { … … 33 28 class listen_thread { 34 29 public: 35 listen_thread(int lsock, void (*callback)(void*, int fd), void* obj) : 36 m_lsock(lsock), m_callback(callback), m_obj(obj) {} 37 void operator() () 38 { 39 while(!iothreads::is_end()) { 40 int sock = ::accept(m_lsock, NULL, NULL); 41 if(sock < 0) { 42 if(errno == EINTR || errno == EAGAIN) { continue; } 43 // FIXME 44 throw std::runtime_error("socket broken"); 45 } else if(sock == 0) { 46 // FIXME 47 throw std::runtime_error("socket broken"); 48 } 49 iothreads::submit(m_callback, m_obj, sock); 50 } 51 } 30 listen_thread(int lsock, 31 void (*callback)(void*, int fd), void* obj); 32 void operator() (); 52 33 private: 53 34 int m_lsock; -
lang/c/mpio/trunk/mp/iothreads/reader.h
r18476 r19043 37 37 { return m_fd; } 38 38 void send_data(const char* buf, size_t len) 39 { return reader::send_data(m_fd, buf, len); }39 { return writer::send_data(m_fd, buf, len); } 40 40 private: 41 41 int m_fd; … … 72 72 MP_ARGS_END 73 73 74 handler& data_impl(int fd);75 void send_data_impl(int fd, const char* buf, size_t len);76 74 //void send_message_impl(int fd, callback_message* msg); 77 75 … … 90 88 class cb_t { 91 89 public: 92 cb_t(handler* h , short ev);90 cb_t(handler* h); 93 91 ~cb_t(); 94 92 public: 95 short event() const;96 void event(short ev);97 93 handler& get(); 98 94 private: 99 95 handler* m_handler; 100 short m_event;101 96 }; 102 97 … … 105 100 ev_t m_ev; 106 101 107 fdnotify< > m_notify; // FIXME102 fdnotify<handler*> m_notify; 108 103 109 104 private: … … 118 113 private: 119 114 // inter-thread impl 120 void msg_add(handler* h);121 void msg_send_message(int fd, callback_message* msg);115 void add_impl(handler* h); 116 void send_message_impl(int fd, callback_message* msg); 122 117 }; 123 118 … … 155 150 156 151 157 short reader::worker::cb_t::event() const 158 { 159 return m_event; 160 } 161 162 void reader::worker::cb_t::event(short ev) 163 { 164 m_event = ev; 165 } 152 template <typename Handler> 153 void reader::add(int fd) 154 { 155 instance().add_impl<Handler>(fd); 156 } 157 158 MP_ARGS_BEGIN 159 template <typename Handler, MP_ARGS_TEMPLATE> 160 void reader::add(int fd, MP_ARGS_PARAMS) 161 { 162 instance().add_impl<Handler>(fd, MP_ARGS_FUNC); 163 } 164 MP_ARGS_END 165 166 167 template <typename Handler> 168 void reader::add_impl(int fd) 169 { 170 handler* h = m_zone.allocate<Handler>(fd); 171 worker_of(fd).add(h); 172 h->connected(); 173 } 174 175 MP_ARGS_BEGIN 176 template <typename Handler, MP_ARGS_TEMPLATE> 177 void reader::add_impl(int fd, MP_ARGS_PARAMS) 178 { 179 handler* h = m_zone.allocate<Handler>(fd, MP_ARGS_FUNC); 180 worker_of(fd).add(h); 181 h->connected(); 182 } 183 MP_ARGS_END 184 185 void reader::worker::add(handler* h) 186 { 187 m_notify.send(h); 188 } 189 190 handler& reader::worker::cb_t::cb_t(handler* h) : 191 m_handler(h) {} 192 193 handler& reader::worker::cb_t::~cb_t() {} 166 194 167 195 handler& reader::worker::cb_t::get() 168 196 { 169 197 return *m_handler; 198 } 199 200 201 void reader::worker::operator() () 202 { 203 cb_t* pcb; 204 int fd; 205 short event; 206 int ret; 207 while(!m_end_flag) { 208 while( m_ev.next(&fd, &event, &pcb) ) { 209 if(fd == m_notify.getfd()) { 210 handler* h; 211 while(m_notify.try_receive(&h)) { 212 add_impl(h); 213 } 214 } else { 215 try_read(*pcb); 216 } 217 } 218 if( (ret = m_ev.wait()) < 0 ) { return ret; } 219 } 220 return 0; 221 } 222 223 void reader::worker::add_impl(handler* h) 224 { 225 m_ev.add(h->fd(), EV_READ, h); 226 } 227 228 void reader::worker::try_read(cb_t& cb) 229 { 230 try { 231 cb.get().read_event(); 232 } catch (...) { 233 // FIXME log 234 close_connection(cb); 235 } 236 } 237 238 void reader::worker::close_connection(cb_t& cb) 239 { 240 int fd = cb.get().fd(); 241 m_ev.remove(fd, EV_READ); 242 ::close(fd); 243 iothreads::submit( 244 &object_callback<void (void)>::mem_fun<reader, &reader::remove>, 245 &cb.get()); 246 // FIXME delete handler 170 247 } 171 248 -
lang/c/mpio/trunk/mp/iothreads/timer.h
r18476 r19043 20 20 #define MP_IOTHREADS_TIMER_H__ 21 21 22 #include <unistd.h>23 24 22 namespace mp { 25 23 namespace iothreads { … … 30 28 class timer_thread { 31 29 public: 32 timer_thread(double interval_sec, void (*callback)(void*), void* obj) : 33 m_interval_usec(interval_sec * 1000 * 1000), 34 m_callback(callback), 35 m_obj(obj) {} 36 void operator() () 37 { 38 while(!iothreads::is_end()) { 39 usleep(m_interval_usec); 40 iothreads::submit(m_callback, m_obj); 41 } 42 } 30 timer_thread(double interval_sec, 31 void (*callback)(void*), void* obj); 32 void operator() (); 43 33 private: 44 34 useconds_t m_interval_usec; -
lang/c/mpio/trunk/mp/iothreads/writer.h
r18476 r19043 20 20 #define MP_IOTHREADS_WRITER_H__ 21 21 22 #include "mp/event.h" 23 #include "mp/fdnotify.h" 24 #include <sys/types.h> 25 #include <sys/uio.h> 26 #include <unistd.h> 27 #include <errno.h> 22 #include <memory> 28 23 29 24 namespace mp { … … 35 30 static void initialize(unsigned int num_threads); 36 31 static void destroy(); 37 ~writer();38 32 39 33 public: 40 typedef void (*finalize_t)(void* user, int fd,const void* buf, size_t buflen);34 typedef void (*finalize_t)(void* user, const void* buf, size_t buflen); 41 35 42 36 static void send_data(int fd, const void* buf, size_t buflen); … … 48 42 finalize_t finalize, void* user, bool thread_safe = false); 49 43 50 private: 51 52 void send_data_impl(int fd, const void* buf, size_t buflen); 53 void send_data_impl(int fd, const void* buf, size_t buflen, finalize_t finalize, 54 void* user, bool thread_safe = false); 55 56 void send_data_thread_impl(int fd, const void* buf, size_t buflen); 57 void send_data_thread_impl(int fd, const void* buf, size_t buflen, 58 finalize_t finalize, void* user, bool thread_safe = false); 44 public: 45 static void finalize_free(void* user, const void* buf, size_t buflen) 46 { free((void*)buf); } 59 47 60 48 private: 61 class worker { 62 public: 63 worker(); 64 ~worker(); 65 66 public: 67 void add(int fd, char* ebit, 68 const void* buf, size_t buflen, size_t off, 69 finalize_t finalize, void* user, 70 bool thread_safe); 71 72 public: 73 void operator() (); 74 75 private: 76 struct notify_entry { 77 int fd; 78 char* ebit; 79 const void* buf; 80 size_t buflen; 81 size_t off; 82 finalize_t finalize; 83 void* user; 84 bool thread_safe; 85 }; 86 87 fdnotify<notify_entry> m_notify; 88 89 void add_impl(notify_entry& e); 90 91 private: 92 class request { 93 public: 94 request(char* ebit, const void* buf, size_t buflen, 95 finalize_t finalize, void* user, 96 bool thread_safe); 97 ~request(); 98 99 public: 100 void finalize(int fd); 101 102 private: 103 char* m_ebit; 104 const void* m_buf; 105 size_t m_buflen; 106 finalize_t m_finalize; 107 void* m_user; 108 bool m_thread_safe; 109 }; 110 111 class context { 112 public: 113 context(); 114 ~context(); 115 116 public: 117 bool empty(); 118 void add(char* ebit, 119 const void* buf, size_t buflen, size_t off, 120 finalize_t finalize, void* user, 121 bool thread_safe); 122 123 public: 124 iovec* vec(); 125 int veclen(); 126 void consumed(int fd, int num); 127 128 private: 129 typedef std::vector<iovec> bufvec_t; 130 bufvec_t m_bufvec; 131 132 typedef std::vector<request> reqvec_t; 133 reqvec_t m_reqvec; 134 }; 135 136 typedef event<context> ev_t; 137 ev_t m_ev; 138 }; 139 140 141 class empty_checker { 142 public: 143 empty_checker(); 144 ~empty_checker(); 145 public: 146 bool empty(); 147 char* use(); 148 private: 149 class chunk_t { 150 public: 151 chunk_t(); 152 public: 153 bool empty(); 154 char* use(); 155 private: 156 char m_bit[sizeof(uint64_t)]; 157 unsigned short m_used; 158 }; 159 typedef std::vector<chunk_t> bits_t; 160 bits_t m_bits; 161 }; 162 163 typedef sparse_array<empty_checker> empty_check_t; 164 empty_check_t m_empty_check; 165 166 std::vector<worker*> m_workers; 167 168 private: 169 worker& worker_of(int fd); 170 void add_data_with_copy(int fd, const void* buf, size_t buflen); 171 void add_data_with_finalize(int fd, const void* buf, size_t buflen, size_t off, 172 finalize_t finalize, void* user, bool thread_safe); 173 174 private: 175 writer(unsigned int num_threads); 176 177 static scoped_ptr<writer> s_instance; 178 static writer& instance(); 49 class impl; 50 static std::auto_ptr<impl> s_instance; 51 static impl& instance(); 179 52 180 53 private: 181 54 writer(); 55 ~writer(); 182 56 writer(const writer&); 183 184 private:185 static void finalize_free(void* user, int fd, const void* buf, size_t buflen)186 { free((void*)buf); }187 57 }; 188 58 189 59 190 void writer::initialize(unsigned int num_threads) 60 typedef writer::finalize_t finalize_t; 61 62 inline void send_data(int fd, const void* buf, size_t buflen) 191 63 { 192 s_instance.reset(new writer(num_threads));64 writer::send_data(fd, buf, buflen); 193 65 } 194 66 195 void writer::destroy() 67 inline void send_data(int fd, const void* buf, size_t buflen, finalize_t finalize, 68 void* user, bool thread_safe = false) 196 69 { 197 s_instance.reset(NULL);70 writer::send_data(fd, buf, buflen, finalize, user, thread_safe); 198 71 } 199 200 writer& writer::instance()201 {202 return *s_instance;203 }204 205 206 writer::writer(unsigned int num_threads)207 {208 m_workers.reserve(num_threads);209 for(unsigned int i=0; i < num_threads; ++i) {210 worker* w = iothreads::manager::add_thread<worker>();211 m_workers.push_back(w);212 }213 }214 215 writer::~writer() {}216 217 218 void writer::send_data(int fd, const void* buf, size_t buflen)219 {220 instance().send_data_impl(fd, buf, buflen);221 }222 223 void writer::send_data_impl(int fd, const void* buf, size_t buflen)224 {225 if(!m_empty_check.test(fd) || m_empty_check.data(fd).empty()) {226 ssize_t wl = ::write(fd, buf, buflen);227 if(wl < 0) {228 if(errno == EAGAIN || errno == EINTR) {229 add_data_with_copy(fd, buf, buflen);230 } else { return; }231 } else if(wl == 0) { return; }232 if(static_cast<size_t>(wl) < buflen) {233 add_data_with_copy(fd, (const void*)(((const char*)buf)+wl), buflen-wl);234 } else {235 return;236 }237 } else {238 add_data_with_copy(fd, buf, buflen);239 }240 }241 242 void writer::add_data_with_copy(int fd, const void* buf, size_t buflen)243 {244 if(!m_empty_check.test(fd)) { m_empty_check.set(fd); }245 void* nbuf = malloc(buflen);246 if(nbuf == NULL) { throw std::bad_alloc(); }247 memcpy(nbuf, buf, buflen);248 worker_of(fd).add(fd, m_empty_check.data(fd).use(),249 buf, buflen, 0,250 &writer::finalize_free, NULL,251 true);252 }253 254 void writer::send_data(int fd, const void* buf, size_t buflen, finalize_t finalize, void* user, bool thread_safe)255 {256 instance().send_data_impl(fd, buf, buflen,257 finalize, user, thread_safe);258 }259 260 void writer::send_data_impl(int fd, const void* buf, size_t buflen, finalize_t finalize, void* user, bool thread_safe)261 {262 if(!m_empty_check.test(fd) || m_empty_check.data(fd).empty()) {263 ssize_t wl = ::write(fd, buf, buflen);264 if(wl < 0) {265 if(errno == EAGAIN || errno == EINTR) {266 add_data_with_finalize(fd, buf, buflen, 0, finalize, user, thread_safe);267 } else { return; }268 } else if(wl == 0) { return; }269 if(static_cast<size_t>(wl) < buflen) {270 add_data_with_finalize(fd, buf, buflen, wl, finalize, user, thread_safe);271 } else {272 (*finalize)(user, fd, buf, buflen);273 }274 } else {275 add_data_with_finalize(fd, buf, buflen, 0, finalize, user, thread_safe);276 }277 }278 279 void writer::add_data_with_finalize(int fd, const void* buf, size_t buflen, size_t off,280 finalize_t finalize, void* user, bool thread_safe)281 {282 if(!m_empty_check.test(fd)) { m_empty_check.set(fd); }283 worker_of(fd).add(fd, m_empty_check.data(fd).use(),284 buf, buflen, off,285 finalize, user,286 thread_safe);287 }288 289 writer::worker& writer::worker_of(int fd)290 {291 return *m_workers[fd % m_workers.size()];292 }293 294 295 void writer::send_data_thread(int fd, const void* buf, size_t buflen)296 {297 instance().send_data_thread_impl(fd, buf, buflen);298 }299 300 void writer::send_data_thread_impl(int fd, const void* buf, size_t buflen)301 {302 add_data_with_copy(fd, buf, buflen);303 }304 305 void writer::send_data_thread(int fd, const void* buf, size_t buflen,306 finalize_t finalize, void* user, bool thread_safe)307 {308 instance().send_data_thread_impl(fd, buf, buflen,309 finalize, user, thread_safe);310 }311 312 void writer::send_data_thread_impl(int fd, const void* buf, size_t buflen,313 finalize_t finalize, void* user, bool thread_safe)314 {315 add_data_with_finalize(fd, buf, buflen, 0, finalize, user, thread_safe);316 }317 318 319 writer::empty_checker::empty_checker()320 {321 m_bits.resize(1);322 }323 324 writer::empty_checker::~empty_checker() { }325 326 327 bool writer::empty_checker::empty()328 {329 for(bits_t::iterator it(m_bits.begin()), it_end(m_bits.end());330 it != it_end;331 ++it) {332 if(!it->empty()) {333 return false;334 }335 }336 return true;337 }338 339 char* writer::empty_checker::use()340 {341 char* b = m_bits.back().use();342 if(b) { return b; }343 m_bits.push_back(chunk_t());344 return m_bits.back().use();345 }346 347 writer::empty_checker::chunk_t::chunk_t() :348 m_used(0)349 {350 memset(m_bit, 0, sizeof(m_bit));351 }352 353 bool writer::empty_checker::chunk_t::empty()354 {355 static const uint64_t ZERO = 0ULL;356 return (memcmp(m_bit, (char*)&ZERO, sizeof(m_bit)) == 0);357 }358 359 char* writer::empty_checker::chunk_t::use()360 {361 if(m_used >= sizeof(m_bit)) { return NULL; }362 char* b = &m_bit[m_used++];363 *b = 1;364 return b;365 }366 367 368 writer::worker::request::request(char* ebit, const void* buf, size_t buflen,369 finalize_t finalize, void* user,370 bool thread_safe) :371 m_ebit(ebit), m_buf(buf), m_buflen(buflen),372 m_finalize(finalize), m_user(user),373 m_thread_safe(thread_safe)374 { }375 376 writer::worker::request::~request() { }377 378 void writer::worker::request::finalize(int fd)379 {380 *m_ebit = 0;381 if(m_finalize) {382 if(m_thread_safe) {383 (*m_finalize)(m_user, fd, m_buf, m_buflen);384 } else {385 iothreads::submit(m_finalize, m_user, fd, m_buf, m_buflen);386 }387 }388 }389 390 391 writer::worker::context::context() { }392 393 writer::worker::context::~context()394 {395 if(!m_reqvec.empty()) {396 for(reqvec_t::iterator it(m_reqvec.begin()), it_end(m_reqvec.end());397 it != it_end;398 ++it) {399 // FIXME400 //it->finalize();401 }402 }403 }404 405 406 inline void writer::worker::context::add(char* ebit,407 const void* buf, size_t buflen, size_t off,408 finalize_t finalize, void* user,409 bool thread_safe)410 {411 struct iovec vec = { ((char*)buf)+off, buflen-off };412 m_bufvec.push_back(vec);413 try {414 m_reqvec.push_back(request(ebit, buf, buflen, finalize, user, thread_safe));415 } catch (...) {416 m_bufvec.pop_back();417 // FIXME418 }419 }420 421 422 iovec* writer::worker::context::vec()423 {424 return &m_bufvec[0];425 }426 427 int writer::worker::context::veclen()428 {429 return m_bufvec.size();430 }431 432 bool writer::worker::context::empty()433 {434 return m_bufvec.empty();435 }436 437 inline void writer::worker::context::consumed(int fd, int num)438 {439 for(int i=0; i < num; ++i) {440 m_reqvec[i].finalize(fd);441 }442 m_bufvec.erase(m_bufvec.begin(), m_bufvec.begin()+num);443 m_reqvec.erase(m_reqvec.begin(), m_reqvec.begin()+num);444 }445 446 447 writer::worker::worker()448 {449 m_ev.add(m_notify.getfd(), EV_READ);450 }451 452 writer::worker::~worker() {}453 454 455 void writer::worker::add(int fd, char* ebit,456 const void* buf, size_t buflen, size_t off,457 finalize_t finalize, void* user,458 bool thread_safe)459 {460 notify_entry e = {461 fd, ebit,462 buf, buflen, off,463 finalize, user, thread_safe464 };465 m_notify.send(e);466 }467 468 inline void writer::worker::add_impl(notify_entry& e)469 {470 if(!m_ev.test(e.fd)) {471 m_ev.add(e.fd, EV_WRITE);472 }473 m_ev.data(e.fd).add(e.ebit, e.buf, e.buflen, e.off,474 e.finalize, e.user, e.thread_safe);475 }476 477 void writer::worker::operator() ()478 {479 int fd;480 short event;481 context* pctx;482 while(!iothreads::is_end()) {483 while(m_ev.next(&fd, &event, &pctx)) {484 if(fd == m_notify.getfd()) {485 notify_entry e;486 while(m_notify.try_receive(&e)) {487 add_impl(e);488 }489 } else {490 context& ctx(m_ev.data(fd));491 ssize_t wl = ::writev(fd, ctx.vec(), ctx.veclen());492 if(wl < 0) {493 if(errno == EAGAIN || errno == EINTR) {494 continue;495 } else {496 m_ev.remove(fd, EV_WRITE); // FIXME497 continue;498 }499 } else if(wl == 0){500 m_ev.remove(fd, EV_WRITE); // FIXME501 continue;502 }503 int i;504 for(i=0; i < ctx.veclen(); ++i) {505 if(static_cast<size_t>(wl) >= ctx.vec()[i].iov_len) {506 wl -= ctx.vec()[i].iov_len;507 } else {508 ctx.vec()[i].iov_base = (void*)(((char*)ctx.vec()[i].iov_base) + wl);509 ctx.vec()[i].iov_len -= wl;510 break;511 }512 }513 if(i > 0) {514 ctx.consumed(fd, i);515 if(ctx.empty()) {516 m_ev.remove(fd, EV_WRITE);517 }518 }519 //sleep(1);520 //throw std::bad_alloc();521 }522 }523 m_ev.wait(); // FIXME524 }525 }526 527 528 // FIXME529 scoped_ptr<writer> writer::s_instance;530 72 531 73 -
lang/c/mpio/trunk/mp/iothreads_impl.pre.h
r18476 r19043 25 25 26 26 27 scoped_ptr<manager> manager::s_instance; // FIXME 28 29 void manager::initialize() 30 { 31 s_instance.reset(new manager()); 27 template <typename ThreadIMPL> 28 ThreadIMPL* manager::add_thread() { 29 return instance().add_thread_impl<ThreadIMPL>(); 32 30 } 33 34 void manager::destroy()35 {36 s_instance.reset(NULL);37 }38 39 40 manager::manager() :41 m_zone(m_source),42 m_end_flag(0)43 { }44 45 manager::~manager()46 { }47 48 49 template <typename ThreadIMPL>50 inline ThreadIMPL* manager::add_thread() { return instance().add_thread_impl<ThreadIMPL>(); }51 31 52 32 MP_ARGS_BEGIN 53 33 template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 54 inline ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); } 34 ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { 35 return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); 36 } 55 37 MP_ARGS_END 56 38 … … 60 42 { 61 43 ThreadIMPL* impl = m_zone.allocate<ThreadIMPL>(); 62 m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 44 pthread_base* th = m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 45 m_threads.push_back(th); 63 46 return impl; 64 47 } … … 69 52 { 70 53 ThreadIMPL* impl = m_zone.allocate<ThreadIMPL, MP_ARGS_TYPES>(MP_ARGS_FUNC); 71 m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 54 pthread_base* th = m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 55 m_threads.push_back(th); 72 56 return impl; 73 57 } … … 75 59 76 60 77 inline void manager::submit(callback_message* msg) 78 { 79 instance().submit_impl(msg); 80 } 81 82 void manager::submit_impl(callback_message* msg) 61 inline void manager::submit_impl(callback_message* msg) 83 62 { 84 63 try { … … 90 69 } 91 70 71 72 inline void manager::submit(callback_message* msg) 73 { 74 instance().submit_impl(msg); 75 } 76 92 77 inline void manager::run() 93 78 { 94 79 instance().run_impl(); 95 80 } 96 97 void manager::run_impl()98 {99 messages_t cache;100 while(!m_end_flag) {101 m_messages.swap(cache);102 for(messages_t::iterator it(cache.begin()), it_end(cache.end());103 it != it_end;104 ++it) {105 callback_message* msg = *it;106 try {107 (*msg)();108 } catch (...) {109 // FIXME110 }111 }112 clear_message(cache);113 }114 }115 116 void manager::clear_message(messages_t& messages)117 {118 for(messages_t::iterator it(messages.begin()), it_end(messages.end());119 it != it_end;120 ++it) {121 delete *it;122 }123 messages.clear();124 }125 126 81 127 82 inline void manager::end() … … 136 91 137 92 93 inline void manager::join() 94 { 95 instance().join_impl(); 96 } 97 98 inline void manager::join_impl() 99 { 100 for(threads_t::iterator it(m_threads.begin()), it_end(m_threads.end()); 101 it != it_end; 102 ++it) { 103 (*it)->join(); 104 } 105 } 106 107 138 108 } // namespace iothreads 139 109 } // namespace mp -
lang/c/mpio/trunk/mp/multiplex.pre.h
r18607 r19043 93 93 typedef event<cb_t> ev_t; 94 94 ev_t m_ev; 95 source<> m_source; 96 zone<source<> > m_zone; 95 zone<> m_zone; 97 96 98 97 private: -
lang/c/mpio/trunk/mp/multiplex_impl.pre.h
r18476 r19043 139 139 void manager::add_impl(int fd) 140 140 { 141 // FIXME sparse_array 141 142 handler* h = m_zone.allocate<Handler>(fd); 142 143 m_ev.add(fd, EV_READ, h, EV_READ); … … 148 149 void manager::add_impl(int fd, MP_ARGS_PARAMS) 149 150 { 151 // FIXME sparse_array 150 152 handler* h = m_zone.allocate<Handler>(fd, MP_ARGS_FUNC); 151 153 m_ev.add(fd, EV_READ, h, EV_READ); … … 275 277 { 276 278 int fd = cb.get().fd(); 279 // FIXME remove handler form sparse_array 277 280 m_ev.remove(fd, cb.event()); 278 281 ::close(fd); -
lang/c/mpio/trunk/mp/sparse_array_impl.pre.h
r18183 r19043 126 126 void* sparse_array<T>::set_impl(size_type index) 127 127 { 128 while( base_array.size() * EXTEND_ARRAY_SIZE <= index) {128 while( base_array.size() <= index / EXTEND_ARRAY_SIZE ) { 129 129 extend_array(); 130 130 } -
lang/c/mpio/trunk/mp/utility.pre.h
r18303 r19043 20 20 #define MP_UTILITY_H__ 21 21 22 #include <unistd.h> 23 #include <fcntl.h> 22 24 #include <memory> 23 25 … … 31 33 32 34 typedef thread_tag<0> main_thread_tag; 35 36 37 inline void set_nonblock(int fd) 38 { 39 // FIXME error 40 fcntl(fd, F_SETFL, O_NONBLOCK); 41 } 33 42 34 43
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)