Changeset 18476 for lang/c/mpio/trunk/mp/multiplex/connect.h
- Timestamp:
- 08/30/08 15:15:48 (4 months ago)
- Files:
-
- 1 modified
-
lang/c/mpio/trunk/mp/multiplex/connect.h (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
lang/c/mpio/trunk/mp/multiplex/connect.h
r18340 r18476 23 23 #include <sys/types.h> 24 24 #include <sys/socket.h> 25 #include <netinet/in.h> 26 #include <arpa/inet.h> 25 27 #include <errno.h> 26 28 … … 31 33 struct connector { 32 34 private: 33 static void initialize(unsigned int num_threads); 34 static void destroy(); 35 static void initialize(unsigned int num_threads) 36 { 37 s_instance.reset(new connector(num_threads)); 38 } 39 40 static void destroy() 41 { 42 s_instance.reset(NULL); 43 } 35 44 36 45 private: 46 class entry { 47 public: 48 entry(socklen_t socklen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 49 m_socklen(socklen), m_callback(callback), m_obj(obj), 50 m_fd(0) {} 51 ~entry(); 52 public: 53 sockaddr* addr() { return (sockaddr*)(((char*)this)+sizeof(entry)); } 54 socklen_t addrlen() { return m_socklen; } 55 void setfd(int fd) { m_fd = fd; } 56 public: 57 void callback() 58 { 59 (*m_callback)(m_obj, addr(), m_socklen, m_fd); 60 } 61 private: 62 socklen_t m_socklen; 63 void (*m_callback)(void*, sockaddr*, socklen_t, int); 64 void* m_obj; 65 int m_fd; 66 }; 67 37 68 class connect_thread : public pthread_thread<connect_thread> { 38 69 public: 39 // FIXME 70 connect_thread(fdnotify<entry*>& return_notify) : 71 m_return_notify(return_notify) 72 { 73 m_ev.add(m_notify.getfd(), EV_READ); // FIXME error 74 } 75 public: 76 void operator() () 77 { 78 entry* pe; 79 while(!multiplex::is_end()) { 80 while(m_notify.try_receive(&pe)) { 81 int sock = ::socket(AF_INET, SOCK_STREAM, 0); // FIXME AF_INET6 82 if(sock < 0) { continue; } // FIXME 83 if(::connect(sock, pe->addr(), pe->socklen()) < 0) { 84 ::close(sock); 85 } else { 86 pe->setfd(sock); 87 } 88 m_return_notify.send(pe); 89 } 90 m_ev.wait(); // FIXME timeout / error 91 } 92 } 93 public: 94 void connect(entry* e) { m_notify.send(e); } 95 private: 96 typedef event<> ev_t; 97 ev_t m_ev; 98 fdnotify<entry*> m_return_notify; 99 fdnotify<entry*> m_notify; 40 100 }; 41 101 102 103 typedef source<sizeof(entry)+sizeof(sockaddr_in), 16> source_t; 104 42 105 class connect_handler : public handler { 106 public: 107 connect_handler(int fd, fdnotify<event*>& return_notify, souce_t& source) : 108 handler(fd), m_return_notify(return_notify), m_source(source) {} 43 109 pubilc: 44 // FIXME 110 void read_event() 111 { 112 entry* pe; 113 while(m_return_notify.try_receive(&pe)) { 114 try { 115 pe->callback(); 116 } catch(...) { 117 pe->~entry(); 118 m_source.free(pe); 119 throw; 120 } 121 pe->~entry(); 122 m_source.free(pe); 123 } 124 } 125 private: 126 fdnotify<>& m_return_notify; 127 source_t& m_source; 45 128 }; 46 129 47 130 private: 48 connector(unsigned int num_threads) 131 connector(unsigned int num_threads) : 132 m_num_threads(num_threads), 133 m_rr_seed(0) 49 134 { 135 m_threads = (connect_thread**)m_source.malloc(sizeof(m_threads*)*num_threads); 50 136 for(unsigned int i=0; i < num_threads; ++i) { 51 iothreads::add_thread<connect_thread>(); 52 // FIXME 137 m_threads[i] = m_zone.allocate<connect_thread>(m_return_notify); 53 138 } 139 multiplex::add<connect_handler>(m_return_notify.getfd(), m_source); 54 140 } 55 141 56 142 ~connector() {} // FIXME 57 143 58 // entry 59 // return_entry 144 void connect_impl(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 145 { 146 void* mem = m_source.malloc(sizeof(entry)+addrlen); 147 try { 148 entry* e = new (mem) entry(addr, addrlen, callback, obj); 149 try { 150 m_threads[m_rr_seed % m_num_threads]->connect(e); 151 ++m_rr_seed; 152 } catch (...) { 153 e->~entry(); 154 } 155 } catch(...) { 156 m_source.free(mem); 157 } 158 } 60 159 61 ~connector() {} // FIXME 160 private: 161 source_t m_source; 162 zone<source> m_zone; 163 connect_thread** m_threads; 164 fdnotify<entry*> m_return_notify; 165 const unsigned int m_num_threads; 166 unsigned int m_rr_seed; 62 167 63 168 private: 64 169 static scoped_ptr<connector> s_instance; 65 170 static connector& instance(); 171 172 public: 173 template <typename T> 174 static void connect(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), T* obj) 175 { 176 s_instance->connect_impl(addr, addrlen, callback, 177 reinterpret_cast<void*>(obj)); 178 } 66 179 }; 67 180 … … 74 187 75 188 189 // FIXME 190 scoped_ptr<connector> connector::s_instance; 191 192 76 193 } // namespace multiplex 77 194 } // namespace mp
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)