- Timestamp:
- 08/30/08 15:15:48 (3 months ago)
- Location:
- lang/c/mpio/trunk/mp
- Files:
-
- 2 added
- 8 modified
-
iothreads.pre.h (modified) (3 diffs)
-
iothreads/connect.h (modified) (3 diffs)
-
iothreads/listen.h (modified) (1 diff)
-
iothreads/reader.h (added)
-
iothreads/timer.h (modified) (1 diff)
-
iothreads/writer.h (added)
-
iothreads_impl.pre.h (modified) (1 diff)
-
multiplex.pre.h (modified) (1 diff)
-
multiplex/connect.h (modified) (3 diffs)
-
multiplex_impl.pre.h (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
lang/c/mpio/trunk/mp/iothreads.pre.h
r18340 r18476 54 54 public: 55 55 template <typename ThreadIMPL> 56 static voidadd_thread();56 static ThreadIMPL* add_thread(); 57 57 MP_ARGS_BEGIN 58 58 template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 59 static voidadd_thread(MP_ARGS_PARAMS);59 static ThreadIMPL* add_thread(MP_ARGS_PARAMS); 60 60 MP_ARGS_END 61 61 … … 70 70 private: 71 71 template <typename ThreadIMPL> 72 voidadd_thread_impl();72 ThreadIMPL* add_thread_impl(); 73 73 MP_ARGS_BEGIN 74 74 template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 75 voidadd_thread_impl(MP_ARGS_PARAMS);75 ThreadIMPL* add_thread_impl(MP_ARGS_PARAMS); 76 76 MP_ARGS_END 77 77 … … 145 145 146 146 #include "mp/iothreads_impl.h" 147 #include "mp/iothreads/writer.h" 147 148 #include "mp/iothreads/listen.h" 148 149 #include "mp/iothreads/timer.h" 149 150 150 151 /*152 namespace mp {153 154 155 template <typename ThreadTag = main_thread_tag>156 class iothreads {157 public:158 159 template <typename IMPL>160 class handler {161 public:162 handler(int fd);163 virtual ~handler();164 protected:165 template <typename T> // fixme166 void submit(void (*callback)(T* obj, void* user), T* obj, void* user);167 public:168 //virtual int read_event() = 0;169 };170 171 template <typename IMPL>172 class stream_handler : public handler {173 public174 stream_handler(int fd); // allocate buffer175 virtual ~stream_handler();176 public:177 int read_event();178 public:179 // throw exception to close fd180 //virtual void receive_data(stream& s) = 0;181 //virtual void unbind() = 0;182 };183 184 template <typename IMPL>185 class timer : public handler {186 public:187 timer(int fd);188 virtual ~timer();189 public:190 static start(double interval);191 public:192 //virtual void triger() = 0;193 };194 195 196 struct thread {197 virtual ~thread() {}198 virtual void run() = 0;199 };200 201 class io_thread : public thread {202 public:203 io_thread();204 ~io_thread();205 public:206 void run();207 private:208 struct cb_t {209 public:210 cb_t();211 ~cb_t();212 public:213 short event();214 void event(short ev);215 handler& handler();216 // fixme buffer accessor217 private:218 short m_event;219 handler& m_handler;220 char* m_wbuffer; // fixme221 };222 private:223 void close_connection(cb_t& cb);224 void rswitch(cb_t& cb);225 void wswitch(cb_t& cb);226 public:227 void add(handler* handler);228 bool have(int fd);229 void send_data(int fd, char* data, size_t len); // fixme230 void send_message(int fd, char* msg, size_t len); // fixme231 private:232 // fixme233 void msg_add();234 void msg_send_data();235 void msg_send_message();236 };237 238 239 public:240 static void initialize(unsigned int num_threads);241 static void destroy();242 ~buffer();243 244 template <typename Handler>245 void add(int fd);246 MP_ARGS_BEGIN247 template <typename Handler, MP_ARGS_TEMPLATE>248 inline int add(int fd, MP_ARGS_PARAMS);249 MP_ARGS_END250 251 public:252 void remove(handler* ctx);253 254 public:255 void send_data(int fd, const char* data, size_t len);256 void send_message(int fd, const char* data, size_t len); // fixme257 258 public:259 template <typename Thread>260 void custom_thread();261 MP_ARGS_BEGIN262 template <typename Thread, MP_ARGS_TEMPLATE>263 inline int custom_thread(MP_ARGS_PARAMS);264 MP_ARGS_END265 266 public:267 void run();268 void end();269 bool is_end() const;270 271 public:272 void submit(void* data); // fixme273 274 private:275 iothreads(unsigned int num_threads);276 277 private:278 unsigned int m_num_threads;279 280 private:281 iothreads(const iothreads&);282 iothreads();283 };284 285 286 287 } // namespace mp288 */289 290 291 151 #endif /* mp/iothreads.h */ 292 152 -
lang/c/mpio/trunk/mp/iothreads/connect.h
r18340 r18476 35 35 36 36 private: 37 class connect_thread : public pthread_thread<connect_thread>{37 class connect_thread { 38 38 public: 39 39 // FIXME … … 78 78 { 79 79 for(unsigned int i=0; i < num_threads; ++i) { 80 iothreads:: add_thread<connect_thread>();80 iothreads::manager::add_thread<connect_thread>(); 81 81 // FIXME 82 82 } … … 103 103 #endif /* mp/iothreads/listen.h */ 104 104 105 -
lang/c/mpio/trunk/mp/iothreads/listen.h
r18340 r18476 31 31 struct listener { 32 32 private: 33 class listen_thread : public pthread_thread<listen_thread>{33 class listen_thread { 34 34 public: 35 35 listen_thread(int lsock, void (*callback)(void*, int fd), void* obj) : 36 pthread_thread<listen_thread>(this),37 36 m_lsock(lsock), m_callback(callback), m_obj(obj) {} 38 37 void operator() () -
lang/c/mpio/trunk/mp/iothreads/timer.h
r18340 r18476 28 28 struct timer { 29 29 private: 30 class timer_thread : public pthread_thread<timer_thread>{30 class timer_thread { 31 31 public: 32 32 timer_thread(double interval_sec, void (*callback)(void*), void* obj) : 33 pthread_thread<timer_thread>(this),34 33 m_interval_usec(interval_sec * 1000 * 1000), 35 34 m_callback(callback), -
lang/c/mpio/trunk/mp/iothreads_impl.pre.h
r18311 r18476 48 48 49 49 template <typename ThreadIMPL> 50 inline void manager::add_thread() {instance().add_thread_impl<ThreadIMPL>(); }50 inline ThreadIMPL* manager::add_thread() { return instance().add_thread_impl<ThreadIMPL>(); } 51 51 52 52 MP_ARGS_BEGIN 53 53 template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 54 inline void manager::add_thread(MP_ARGS_PARAMS) {instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); }54 inline ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); } 55 55 MP_ARGS_END 56 56 57 57 58 58 template <typename ThreadIMPL> 59 voidmanager::add_thread_impl()59 ThreadIMPL* manager::add_thread_impl() 60 60 { 61 61 ThreadIMPL* impl = m_zone.allocate<ThreadIMPL>(); 62 62 m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 63 return impl; 63 64 } 64 65 65 66 MP_ARGS_BEGIN 66 67 template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 67 voidmanager::add_thread_impl(MP_ARGS_PARAMS)68 ThreadIMPL* manager::add_thread_impl(MP_ARGS_PARAMS) 68 69 { 69 70 ThreadIMPL* impl = m_zone.allocate<ThreadIMPL, MP_ARGS_TYPES>(MP_ARGS_FUNC); 70 71 m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 72 return impl; 71 73 } 72 74 MP_ARGS_END -
lang/c/mpio/trunk/mp/multiplex.pre.h
r18340 r18476 36 36 virtual void connected() {} 37 37 public: 38 int fd() const { return m_fd; } 38 int fd() const 39 { return m_fd; } 40 void send_data(const char* buf, size_t len) 41 { return manager::send_data(m_fd, buf, len); } 39 42 private: 40 43 int m_fd; -
lang/c/mpio/trunk/mp/multiplex/connect.h
r18340 r18476 23 23 #include <sys/types.h> 24 24 #include <sys/socket.h> 25 #include <netinet/in.h> 26 #include <arpa/inet.h> 25 27 #include <errno.h> 26 28 … … 31 33 struct connector { 32 34 private: 33 static void initialize(unsigned int num_threads); 34 static void destroy(); 35 static void initialize(unsigned int num_threads) 36 { 37 s_instance.reset(new connector(num_threads)); 38 } 39 40 static void destroy() 41 { 42 s_instance.reset(NULL); 43 } 35 44 36 45 private: 46 class entry { 47 public: 48 entry(socklen_t socklen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 49 m_socklen(socklen), m_callback(callback), m_obj(obj), 50 m_fd(0) {} 51 ~entry(); 52 public: 53 sockaddr* addr() { return (sockaddr*)(((char*)this)+sizeof(entry)); } 54 socklen_t addrlen() { return m_socklen; } 55 void setfd(int fd) { m_fd = fd; } 56 public: 57 void callback() 58 { 59 (*m_callback)(m_obj, addr(), m_socklen, m_fd); 60 } 61 private: 62 socklen_t m_socklen; 63 void (*m_callback)(void*, sockaddr*, socklen_t, int); 64 void* m_obj; 65 int m_fd; 66 }; 67 37 68 class connect_thread : public pthread_thread<connect_thread> { 38 69 public: 39 // FIXME 70 connect_thread(fdnotify<entry*>& return_notify) : 71 m_return_notify(return_notify) 72 { 73 m_ev.add(m_notify.getfd(), EV_READ); // FIXME error 74 } 75 public: 76 void operator() () 77 { 78 entry* pe; 79 while(!multiplex::is_end()) { 80 while(m_notify.try_receive(&pe)) { 81 int sock = ::socket(AF_INET, SOCK_STREAM, 0); // FIXME AF_INET6 82 if(sock < 0) { continue; } // FIXME 83 if(::connect(sock, pe->addr(), pe->socklen()) < 0) { 84 ::close(sock); 85 } else { 86 pe->setfd(sock); 87 } 88 m_return_notify.send(pe); 89 } 90 m_ev.wait(); // FIXME timeout / error 91 } 92 } 93 public: 94 void connect(entry* e) { m_notify.send(e); } 95 private: 96 typedef event<> ev_t; 97 ev_t m_ev; 98 fdnotify<entry*> m_return_notify; 99 fdnotify<entry*> m_notify; 40 100 }; 41 101 102 103 typedef source<sizeof(entry)+sizeof(sockaddr_in), 16> source_t; 104 42 105 class connect_handler : public handler { 106 public: 107 connect_handler(int fd, fdnotify<event*>& return_notify, souce_t& source) : 108 handler(fd), m_return_notify(return_notify), m_source(source) {} 43 109 pubilc: 44 // FIXME 110 void read_event() 111 { 112 entry* pe; 113 while(m_return_notify.try_receive(&pe)) { 114 try { 115 pe->callback(); 116 } catch(...) { 117 pe->~entry(); 118 m_source.free(pe); 119 throw; 120 } 121 pe->~entry(); 122 m_source.free(pe); 123 } 124 } 125 private: 126 fdnotify<>& m_return_notify; 127 source_t& m_source; 45 128 }; 46 129 47 130 private: 48 connector(unsigned int num_threads) 131 connector(unsigned int num_threads) : 132 m_num_threads(num_threads), 133 m_rr_seed(0) 49 134 { 135 m_threads = (connect_thread**)m_source.malloc(sizeof(m_threads*)*num_threads); 50 136 for(unsigned int i=0; i < num_threads; ++i) { 51 iothreads::add_thread<connect_thread>(); 52 // FIXME 137 m_threads[i] = m_zone.allocate<connect_thread>(m_return_notify); 53 138 } 139 multiplex::add<connect_handler>(m_return_notify.getfd(), m_source); 54 140 } 55 141 56 142 ~connector() {} // FIXME 57 143 58 // entry 59 // return_entry 144 void connect_impl(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 145 { 146 void* mem = m_source.malloc(sizeof(entry)+addrlen); 147 try { 148 entry* e = new (mem) entry(addr, addrlen, callback, obj); 149 try { 150 m_threads[m_rr_seed % m_num_threads]->connect(e); 151 ++m_rr_seed; 152 } catch (...) { 153 e->~entry(); 154 } 155 } catch(...) { 156 m_source.free(mem); 157 } 158 } 60 159 61 ~connector() {} // FIXME 160 private: 161 source_t m_source; 162 zone<source> m_zone; 163 connect_thread** m_threads; 164 fdnotify<entry*> m_return_notify; 165 const unsigned int m_num_threads; 166 unsigned int m_rr_seed; 62 167 63 168 private: 64 169 static scoped_ptr<connector> s_instance; 65 170 static connector& instance(); 171 172 public: 173 template <typename T> 174 static void connect(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), T* obj) 175 { 176 s_instance->connect_impl(addr, addrlen, callback, 177 reinterpret_cast<void*>(obj)); 178 } 66 179 }; 67 180 … … 74 187 75 188 189 // FIXME 190 scoped_ptr<connector> connector::s_instance; 191 192 76 193 } // namespace multiplex 77 194 } // namespace mp -
lang/c/mpio/trunk/mp/multiplex_impl.pre.h
r18340 r18476 57 57 manager::cb_t::cb_t(handler* h, short ev) : 58 58 m_handler(h), m_event(ev), 59 m_wbuffer(NULL), m_allocated(0), m_used(0) {} 59 m_wbuffer((char*)malloc(2048)), m_allocated(2048), m_used(0) 60 { 61 if(!m_wbuffer) { throw std::bad_alloc(); } 62 } 60 63 61 64 manager::cb_t::~cb_t() … … 98 101 m_used = 0; 99 102 } else { 100 std::memmove(m_wbuffer, m_wbuffer+len, len);103 std::memmove(m_wbuffer, m_wbuffer+len, m_used-len); 101 104 } 102 105 } … … 169 172 void manager::send_data_impl(int fd, const char* buf, size_t len) 170 173 { 171 m_ev.data(fd).append_buffer(buf, len); 174 cb_t& cb( m_ev.data(fd) ); 175 cb.append_buffer(buf, len); 176 m_wswitch_ctx.push_back(&cb); 172 177 } 173 178 … … 196 201 while(!m_end_flag) { 197 202 while( m_ev.next(&fd, &event, &pcb) ) { 198 for(wswitch_ctx_t::iterator it(m_wswitch_ctx.begin()), it_end(m_wswitch_ctx.end());199 it != it_end;200 ++it) {201 if( try_write(*pcb) ) {202 wswitch(*pcb);203 }204 }205 m_wswitch_ctx.clear();206 203 if( event & EV_READ ) { 207 204 try_read(*pcb); … … 211 208 } 212 209 } 210 for(wswitch_ctx_t::iterator it(m_wswitch_ctx.begin()), it_end(m_wswitch_ctx.end()); 211 it != it_end; 212 ++it) { 213 if( try_write(*pcb) ) { 214 wswitch(*pcb); 215 } 216 } 217 m_wswitch_ctx.clear(); 213 218 if( (ret = m_ev.wait()) < 0 ) { return ret; } 214 219 } … … 229 234 bool manager::try_write(cb_t& cb) 230 235 { 236 if(cb.length() == 0) { return false; } 231 237 ssize_t len = ::write(cb.get().fd(), cb.wbuffer(), cb.length()); 232 238 if(len < 0) {
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)