Changeset 18476 for lang

Show
Ignore:
Timestamp:
08/30/08 15:15:48 (3 months ago)
Author:
frsyuki
Message:

lang/c/mpio: added mp::iothreads::reader and mp::iothreads::writer

Location:
lang/c/mpio/trunk/mp
Files:
2 added
8 modified

Legend:

Unmodified
Added
Removed
  • lang/c/mpio/trunk/mp/iothreads.pre.h

    r18340 r18476  
    5454public: 
    5555        template <typename ThreadIMPL> 
    56         static void add_thread(); 
     56        static ThreadIMPL* add_thread(); 
    5757MP_ARGS_BEGIN 
    5858        template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 
    59         static void add_thread(MP_ARGS_PARAMS); 
     59        static ThreadIMPL* add_thread(MP_ARGS_PARAMS); 
    6060MP_ARGS_END 
    6161 
     
    7070private: 
    7171        template <typename ThreadIMPL> 
    72         void add_thread_impl(); 
     72        ThreadIMPL* add_thread_impl(); 
    7373MP_ARGS_BEGIN 
    7474        template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 
    75         void add_thread_impl(MP_ARGS_PARAMS); 
     75        ThreadIMPL* add_thread_impl(MP_ARGS_PARAMS); 
    7676MP_ARGS_END 
    7777 
     
    145145 
    146146#include "mp/iothreads_impl.h" 
     147#include "mp/iothreads/writer.h" 
    147148#include "mp/iothreads/listen.h" 
    148149#include "mp/iothreads/timer.h" 
    149150 
    150  
    151 /* 
    152 namespace mp { 
    153  
    154  
    155 template <typename ThreadTag = main_thread_tag> 
    156 class iothreads { 
    157 public: 
    158  
    159         template <typename IMPL> 
    160         class handler { 
    161         public: 
    162                 handler(int fd); 
    163                 virtual ~handler(); 
    164         protected: 
    165                 template <typename T>  // fixme 
    166                 void submit(void (*callback)(T* obj, void* user), T* obj, void* user); 
    167         public: 
    168                 //virtual int read_event() = 0; 
    169         }; 
    170  
    171         template <typename IMPL> 
    172         class stream_handler : public handler { 
    173         public 
    174                 stream_handler(int fd);  // allocate buffer 
    175                 virtual ~stream_handler(); 
    176         public: 
    177                 int read_event(); 
    178         public: 
    179                 // throw exception to close fd 
    180                 //virtual void receive_data(stream& s) = 0; 
    181                 //virtual void unbind() = 0; 
    182         }; 
    183  
    184         template <typename IMPL> 
    185         class timer : public handler { 
    186         public: 
    187                 timer(int fd); 
    188                 virtual ~timer(); 
    189         public: 
    190                 static start(double interval); 
    191         public: 
    192                 //virtual void triger() = 0; 
    193         }; 
    194  
    195  
    196         struct thread { 
    197                 virtual ~thread() {} 
    198                 virtual void run() = 0; 
    199         }; 
    200  
    201         class io_thread : public thread { 
    202         public: 
    203                 io_thread(); 
    204                 ~io_thread(); 
    205         public: 
    206                 void run(); 
    207         private: 
    208                 struct cb_t { 
    209                 public: 
    210                         cb_t(); 
    211                         ~cb_t(); 
    212                 public: 
    213                         short event(); 
    214                         void event(short ev); 
    215                         handler& handler(); 
    216                         // fixme buffer accessor 
    217                 private: 
    218                         short m_event; 
    219                         handler& m_handler; 
    220                         char* m_wbuffer;  // fixme 
    221                 }; 
    222         private: 
    223                 void close_connection(cb_t& cb); 
    224                 void rswitch(cb_t& cb); 
    225                 void wswitch(cb_t& cb); 
    226         public: 
    227                 void add(handler* handler); 
    228                 bool have(int fd); 
    229                 void send_data(int fd, char* data, size_t len);  // fixme 
    230                 void send_message(int fd, char* msg, size_t len);  // fixme 
    231         private: 
    232                 // fixme 
    233                 void msg_add(); 
    234                 void msg_send_data(); 
    235                 void msg_send_message(); 
    236         }; 
    237  
    238  
    239 public: 
    240         static void initialize(unsigned int num_threads); 
    241         static void destroy(); 
    242         ~buffer(); 
    243  
    244         template <typename Handler> 
    245         void add(int fd); 
    246 MP_ARGS_BEGIN 
    247         template <typename Handler, MP_ARGS_TEMPLATE> 
    248         inline int add(int fd, MP_ARGS_PARAMS); 
    249 MP_ARGS_END 
    250  
    251 public: 
    252         void remove(handler* ctx); 
    253  
    254 public: 
    255         void send_data(int fd, const char* data, size_t len); 
    256         void send_message(int fd, const char* data, size_t len);  // fixme 
    257  
    258 public: 
    259         template <typename Thread> 
    260         void custom_thread(); 
    261 MP_ARGS_BEGIN 
    262         template <typename Thread, MP_ARGS_TEMPLATE> 
    263         inline int custom_thread(MP_ARGS_PARAMS); 
    264 MP_ARGS_END 
    265  
    266 public: 
    267         void run(); 
    268         void end(); 
    269         bool is_end() const; 
    270  
    271 public: 
    272         void submit(void* data);  // fixme 
    273  
    274 private: 
    275         iothreads(unsigned int num_threads); 
    276  
    277 private: 
    278         unsigned int m_num_threads; 
    279  
    280 private: 
    281         iothreads(const iothreads&); 
    282         iothreads(); 
    283 }; 
    284  
    285  
    286  
    287 }  // namespace mp 
    288 */ 
    289  
    290  
    291151#endif /* mp/iothreads.h */ 
    292152 
  • lang/c/mpio/trunk/mp/iothreads/connect.h

    r18340 r18476  
    3535 
    3636private: 
    37         class connect_thread : public pthread_thread<connect_thread> { 
     37        class connect_thread { 
    3838        public: 
    3939                // FIXME 
     
    7878        { 
    7979                for(unsigned int i=0; i < num_threads; ++i) { 
    80                         iothreads::add_thread<connect_thread>(); 
     80                        iothreads::manager::add_thread<connect_thread>(); 
    8181                        // FIXME 
    8282                } 
     
    103103#endif /* mp/iothreads/listen.h */ 
    104104 
    105  
  • lang/c/mpio/trunk/mp/iothreads/listen.h

    r18340 r18476  
    3131struct listener { 
    3232private: 
    33         class listen_thread : public pthread_thread<listen_thread> { 
     33        class listen_thread { 
    3434        public: 
    3535                listen_thread(int lsock, void (*callback)(void*, int fd), void* obj) : 
    36                         pthread_thread<listen_thread>(this), 
    3736                        m_lsock(lsock), m_callback(callback), m_obj(obj) {} 
    3837                void operator() () 
  • lang/c/mpio/trunk/mp/iothreads/timer.h

    r18340 r18476  
    2828struct timer { 
    2929private: 
    30         class timer_thread : public pthread_thread<timer_thread> { 
     30        class timer_thread { 
    3131        public: 
    3232                timer_thread(double interval_sec, void (*callback)(void*), void* obj) : 
    33                         pthread_thread<timer_thread>(this), 
    3433                        m_interval_usec(interval_sec * 1000 * 1000), 
    3534                        m_callback(callback), 
  • lang/c/mpio/trunk/mp/iothreads_impl.pre.h

    r18311 r18476  
    4848 
    4949template <typename ThreadIMPL> 
    50 inline void manager::add_thread() { instance().add_thread_impl<ThreadIMPL>(); } 
     50inline ThreadIMPL* manager::add_thread() { return instance().add_thread_impl<ThreadIMPL>(); } 
    5151 
    5252MP_ARGS_BEGIN 
    5353template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 
    54 inline void manager::add_thread(MP_ARGS_PARAMS) { instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); } 
     54inline ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); } 
    5555MP_ARGS_END 
    5656 
    5757 
    5858template <typename ThreadIMPL> 
    59 void manager::add_thread_impl() 
     59ThreadIMPL* manager::add_thread_impl() 
    6060{ 
    6161        ThreadIMPL* impl = m_zone.allocate<ThreadIMPL>(); 
    6262        m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 
     63        return impl; 
    6364} 
    6465 
    6566MP_ARGS_BEGIN 
    6667template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 
    67 void manager::add_thread_impl(MP_ARGS_PARAMS) 
     68ThreadIMPL* manager::add_thread_impl(MP_ARGS_PARAMS) 
    6869{ 
    6970        ThreadIMPL* impl = m_zone.allocate<ThreadIMPL, MP_ARGS_TYPES>(MP_ARGS_FUNC); 
    7071        m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 
     72        return impl; 
    7173} 
    7274MP_ARGS_END 
  • lang/c/mpio/trunk/mp/multiplex.pre.h

    r18340 r18476  
    3636        virtual void connected() {} 
    3737public: 
    38         int fd() const { return m_fd; } 
     38        int fd() const 
     39                { return m_fd; } 
     40        void send_data(const char* buf, size_t len) 
     41                { return manager::send_data(m_fd, buf, len); } 
    3942private: 
    4043        int m_fd; 
  • lang/c/mpio/trunk/mp/multiplex/connect.h

    r18340 r18476  
    2323#include <sys/types.h> 
    2424#include <sys/socket.h> 
     25#include <netinet/in.h> 
     26#include <arpa/inet.h> 
    2527#include <errno.h> 
    2628 
     
    3133struct connector { 
    3234private: 
    33         static void initialize(unsigned int num_threads); 
    34         static void destroy(); 
     35        static void initialize(unsigned int num_threads) 
     36        { 
     37                s_instance.reset(new connector(num_threads)); 
     38        } 
     39 
     40        static void destroy() 
     41        { 
     42                s_instance.reset(NULL); 
     43        } 
    3544 
    3645private: 
     46        class entry { 
     47        public: 
     48                entry(socklen_t socklen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 
     49                        m_socklen(socklen), m_callback(callback), m_obj(obj), 
     50                        m_fd(0) {} 
     51                ~entry(); 
     52        public: 
     53                sockaddr* addr() { return (sockaddr*)(((char*)this)+sizeof(entry)); } 
     54                socklen_t addrlen() { return m_socklen; } 
     55                void setfd(int fd) { m_fd = fd; } 
     56        public: 
     57                void callback() 
     58                { 
     59                        (*m_callback)(m_obj, addr(), m_socklen, m_fd); 
     60                } 
     61        private: 
     62                socklen_t m_socklen; 
     63                void (*m_callback)(void*, sockaddr*, socklen_t, int); 
     64                void* m_obj; 
     65                int m_fd; 
     66        }; 
     67 
    3768        class connect_thread : public pthread_thread<connect_thread> { 
    3869        public: 
    39                 // FIXME 
     70                connect_thread(fdnotify<entry*>& return_notify) : 
     71                        m_return_notify(return_notify) 
     72                { 
     73                        m_ev.add(m_notify.getfd(), EV_READ);  // FIXME error 
     74                } 
     75        public: 
     76                void operator() () 
     77                { 
     78                        entry* pe; 
     79                        while(!multiplex::is_end()) { 
     80                                while(m_notify.try_receive(&pe)) { 
     81                                        int sock = ::socket(AF_INET, SOCK_STREAM, 0);  // FIXME AF_INET6 
     82                                        if(sock < 0) { continue; }  // FIXME 
     83                                        if(::connect(sock, pe->addr(), pe->socklen()) < 0) { 
     84                                                ::close(sock); 
     85                                        } else { 
     86                                                pe->setfd(sock); 
     87                                        } 
     88                                        m_return_notify.send(pe); 
     89                                } 
     90                                m_ev.wait();  // FIXME  timeout / error 
     91                        } 
     92                } 
     93        public: 
     94                void connect(entry* e) { m_notify.send(e); } 
     95        private: 
     96                typedef event<> ev_t; 
     97                ev_t m_ev; 
     98                fdnotify<entry*> m_return_notify; 
     99                fdnotify<entry*> m_notify; 
    40100        }; 
    41101 
     102 
     103        typedef source<sizeof(entry)+sizeof(sockaddr_in), 16> source_t; 
     104 
    42105        class connect_handler : public handler { 
     106        public: 
     107                connect_handler(int fd, fdnotify<event*>& return_notify, souce_t& source) : 
     108                        handler(fd), m_return_notify(return_notify), m_source(source) {} 
    43109        pubilc: 
    44                 // FIXME 
     110                void read_event() 
     111                { 
     112                        entry* pe; 
     113                        while(m_return_notify.try_receive(&pe)) { 
     114                                try { 
     115                                        pe->callback(); 
     116                                } catch(...) { 
     117                                        pe->~entry(); 
     118                                        m_source.free(pe); 
     119                                        throw; 
     120                                } 
     121                                pe->~entry(); 
     122                                m_source.free(pe); 
     123                        } 
     124                } 
     125        private: 
     126                fdnotify<>& m_return_notify; 
     127                source_t& m_source; 
    45128        }; 
    46129 
    47130private: 
    48         connector(unsigned int num_threads) 
     131        connector(unsigned int num_threads) : 
     132                m_num_threads(num_threads), 
     133                m_rr_seed(0) 
    49134        { 
     135                m_threads = (connect_thread**)m_source.malloc(sizeof(m_threads*)*num_threads); 
    50136                for(unsigned int i=0; i < num_threads; ++i) { 
    51                         iothreads::add_thread<connect_thread>(); 
    52                         // FIXME 
     137                        m_threads[i] = m_zone.allocate<connect_thread>(m_return_notify); 
    53138                } 
     139                multiplex::add<connect_handler>(m_return_notify.getfd(), m_source); 
    54140        } 
    55141 
    56142        ~connector() {}  // FIXME 
    57143 
    58         // entry 
    59         // return_entry 
     144        void connect_impl(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), void* obj) 
     145        { 
     146                void* mem = m_source.malloc(sizeof(entry)+addrlen); 
     147                try { 
     148                        entry* e = new (mem) entry(addr, addrlen, callback, obj); 
     149                        try { 
     150                                m_threads[m_rr_seed % m_num_threads]->connect(e); 
     151                                ++m_rr_seed; 
     152                        } catch (...) { 
     153                                e->~entry(); 
     154                        } 
     155                } catch(...) { 
     156                        m_source.free(mem); 
     157                } 
     158        } 
    60159 
    61         ~connector() {}  // FIXME 
     160private: 
     161        source_t m_source; 
     162        zone<source> m_zone; 
     163        connect_thread** m_threads; 
     164        fdnotify<entry*> m_return_notify; 
     165        const unsigned int m_num_threads; 
     166        unsigned int m_rr_seed; 
    62167 
    63168private: 
    64169        static scoped_ptr<connector> s_instance; 
    65170        static connector& instance(); 
     171 
     172public: 
     173        template <typename T> 
     174        static void connect(sockaddr* addr, socklen_t addrlen, void (*callback)(void*, sockaddr*, socklen_t, int), T* obj) 
     175        { 
     176                s_instance->connect_impl(addr, addrlen, callback, 
     177                                reinterpret_cast<void*>(obj)); 
     178        } 
    66179}; 
    67180 
     
    74187 
    75188 
     189// FIXME 
     190scoped_ptr<connector> connector::s_instance; 
     191 
     192 
    76193}  // namespace multiplex 
    77194}  // namespace mp 
  • lang/c/mpio/trunk/mp/multiplex_impl.pre.h

    r18340 r18476  
    5757manager::cb_t::cb_t(handler* h, short ev) : 
    5858        m_handler(h), m_event(ev), 
    59         m_wbuffer(NULL), m_allocated(0), m_used(0) {} 
     59        m_wbuffer((char*)malloc(2048)), m_allocated(2048), m_used(0) 
     60{ 
     61        if(!m_wbuffer) { throw std::bad_alloc(); } 
     62} 
    6063 
    6164manager::cb_t::~cb_t() 
     
    98101                m_used = 0; 
    99102        } else { 
    100                 std::memmove(m_wbuffer, m_wbuffer+len, len); 
     103                std::memmove(m_wbuffer, m_wbuffer+len, m_used-len); 
    101104        } 
    102105} 
     
    169172void manager::send_data_impl(int fd, const char* buf, size_t len) 
    170173{ 
    171         m_ev.data(fd).append_buffer(buf, len); 
     174        cb_t& cb( m_ev.data(fd) ); 
     175        cb.append_buffer(buf, len); 
     176        m_wswitch_ctx.push_back(&cb); 
    172177} 
    173178 
     
    196201        while(!m_end_flag) { 
    197202                while( m_ev.next(&fd, &event, &pcb) ) { 
    198                         for(wswitch_ctx_t::iterator it(m_wswitch_ctx.begin()), it_end(m_wswitch_ctx.end()); 
    199                                         it != it_end; 
    200                                         ++it) { 
    201                                 if( try_write(*pcb) ) { 
    202                                         wswitch(*pcb); 
    203                                 } 
    204                         } 
    205                         m_wswitch_ctx.clear(); 
    206203                        if( event & EV_READ ) { 
    207204                                try_read(*pcb); 
     
    211208                        } 
    212209                } 
     210                for(wswitch_ctx_t::iterator it(m_wswitch_ctx.begin()), it_end(m_wswitch_ctx.end()); 
     211                                it != it_end; 
     212                                ++it) { 
     213                        if( try_write(*pcb) ) { 
     214                                wswitch(*pcb); 
     215                        } 
     216                } 
     217                m_wswitch_ctx.clear(); 
    213218                if( (ret = m_ev.wait()) < 0 ) { return ret; } 
    214219        } 
     
    229234bool manager::try_write(cb_t& cb) 
    230235{ 
     236        if(cb.length() == 0) { return false; } 
    231237        ssize_t len = ::write(cb.get().fd(), cb.wbuffer(), cb.length()); 
    232238        if(len < 0) {