Show
Ignore:
Timestamp:
08/30/08 15:15:48 (4 months ago)
Author:
frsyuki
Message:

lang/c/mpio: added mp::iothreads::reader and mp::iothreads::writer

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • lang/c/mpio/trunk/mp/multiplex/connect.h

    r18340 r18476  
    2323#include <sys/types.h> 
    2424#include <sys/socket.h> 
     25#include <netinet/in.h> 
     26#include <arpa/inet.h> 
    2527#include <errno.h> 
    2628 
     
    3133struct connector { 
    3234private: 
    33         static void initialize(unsigned int num_threads); 
    34         static void destroy(); 
     35        static void initialize(unsigned int num_threads) 
     36        { 
     37                s_instance.reset(new connector(num_threads)); 
     38        } 
     39 
     40        static void destroy() 
     41        { 
     42                s_instance.reset(NULL); 
     43        } 
    3544 
    3645private: 
     46        class entry { 
     47        public: 
     48                entry(socklen_t socklen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 
     49                        m_socklen(socklen), m_callback(callback), m_obj(obj), 
     50                        m_fd(0) {} 
     51                ~entry(); 
     52        public: 
     53                sockaddr* addr() { return (sockaddr*)(((char*)this)+sizeof(entry)); } 
     54                socklen_t addrlen() { return m_socklen; } 
     55                void setfd(int fd) { m_fd = fd; } 
     56        public: 
     57                void callback() 
     58                { 
     59                        (*m_callback)(m_obj, addr(), m_socklen, m_fd); 
     60                } 
     61        private: 
     62                socklen_t m_socklen; 
     63                void (*m_callback)(void*, sockaddr*, socklen_t, int); 
     64                void* m_obj; 
     65                int m_fd; 
     66        }; 
     67 
    3768        class connect_thread : public pthread_thread<connect_thread> { 
    3869        public: 
    39                 // FIXME 
     70                connect_thread(fdnotify<entry*>& return_notify) : 
     71                        m_return_notify(return_notify) 
     72                { 
     73                        m_ev.add(m_notify.getfd(), EV_READ);  // FIXME error 
     74                } 
     75        public: 
     76                void operator() () 
     77                { 
     78                        entry* pe; 
     79                        while(!multiplex::is_end()) { 
     80                                while(m_notify.try_receive(&pe)) { 
     81                                        int sock = ::socket(AF_INET, SOCK_STREAM, 0);  // FIXME AF_INET6 
     82                                        if(sock < 0) { continue; }  // FIXME 
     83                                        if(::connect(sock, pe->addr(), pe->socklen()) < 0) { 
     84                                                ::close(sock); 
     85                                        } else { 
     86                                                pe->setfd(sock); 
     87                                        } 
     88                                        m_return_notify.send(pe); 
     89                                } 
     90                                m_ev.wait();  // FIXME  timeout / error 
     91                        } 
     92                } 
     93        public: 
     94                void connect(entry* e) { m_notify.send(e); } 
     95        private: 
     96                typedef event<> ev_t; 
     97                ev_t m_ev; 
     98                fdnotify<entry*> m_return_notify; 
     99                fdnotify<entry*> m_notify; 
    40100        }; 
    41101 
     102 
     103        typedef source<sizeof(entry)+sizeof(sockaddr_in), 16> source_t; 
     104 
    42105        class connect_handler : public handler { 
     106        public: 
     107                connect_handler(int fd, fdnotify<event*>& return_notify, souce_t& source) : 
     108                        handler(fd), m_return_notify(return_notify), m_source(source) {} 
    43109        pubilc: 
    44                 // FIXME 
     110                void read_event() 
     111                { 
     112                        entry* pe; 
     113                        while(m_return_notify.try_receive(&pe)) { 
     114                                try { 
     115                                        pe->callback(); 
     116                                } catch(...) { 
     117                                        pe->~entry(); 
     118                                        m_source.free(pe); 
     119                                        throw; 
     120                                } 
     121                                pe->~entry(); 
     122                                m_source.free(pe); 
     123                        } 
     124                } 
     125        private: 
     126                fdnotify<>& m_return_notify; 
     127                source_t& m_source; 
    45128        }; 
    46129 
    47130private: 
    48         connector(unsigned int num_threads) 
     131        connector(unsigned int num_threads) : 
     132                m_num_threads(num_threads), 
     133                m_rr_seed(0) 
    49134        { 
     135                m_threads = (connect_thread**)m_source.malloc(sizeof(m_threads*)*num_threads); 
    50136                for(unsigned int i=0; i < num_threads; ++i) { 
    51                         iothreads::add_thread<connect_thread>(); 
    52                         // FIXME 
     137                        m_threads[i] = m_zone.allocate<connect_thread>(m_return_notify); 
    53138                } 
     139                multiplex::add<connect_handler>(m_return_notify.getfd(), m_source); 
    54140        } 
    55141 
    56142        ~connector() {}  // FIXME 
    57143 
    58         // entry 
    59         // return_entry 
     144        void connect_impl(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 
     145        { 
     146                void* mem = m_source.malloc(sizeof(entry)+addrlen); 
     147                try { 
     148                        entry* e = new (mem) entry(addr, addrlen, callback, obj); 
     149                        try { 
     150                                m_threads[m_rr_seed % m_num_threads]->connect(e); 
     151                                ++m_rr_seed; 
     152                        } catch (...) { 
     153                                e->~entry(); 
     154                        } 
     155                } catch(...) { 
     156                        m_source.free(mem); 
     157                } 
     158        } 
    60159 
    61         ~connector() {}  // FIXME 
     160private: 
     161        source_t m_source; 
     162        zone<source> m_zone; 
     163        connect_thread** m_threads; 
     164        fdnotify<entry*> m_return_notify; 
     165        const unsigned int m_num_threads; 
     166        unsigned int m_rr_seed; 
    62167 
    63168private: 
    64169        static scoped_ptr<connector> s_instance; 
    65170        static connector& instance(); 
     171 
     172public: 
     173        template <typename T> 
     174        static void connect(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), T* obj) 
     175        { 
     176                s_instance->connect_impl(addr, addrlen, callback, 
     177                                reinterpret_cast<void*>(obj)); 
     178        } 
    66179}; 
    67180 
     
    74187 
    75188 
     189// FIXME 
     190scoped_ptr<connector> connector::s_instance; 
     191 
     192 
    76193}  // namespace multiplex 
    77194}  // namespace mp