Changeset 19043

Show
Ignore:
Timestamp:
09/09/08 20:46:02 (6 years ago)
Author:
frsyuki
Message:

lang/c/mpio: implemented mp::iothreads::{manager,writer,listener,timer}

Location:
lang/c/mpio/trunk
Files:
4 added
11 modified

Legend:

Unmodified
Added
Removed
  • lang/c/mpio/trunk/mp/iothreads.pre.h

    r18476 r19043  
    2525#include "mp/zone.h" 
    2626#include "mp/callback_message.h" 
    27 #include "mp/utility.h" 
     27#include <memory> 
    2828 
    2929 
     
    6262        static void submit(callback_message* msg); 
    6363 
     64        static void join(); 
     65 
    6466        static void run(); 
    6567        static void end(); 
    6668        static bool is_end(); 
    67  
    68         // FIXME join 
    6969 
    7070private: 
     
    7878        void submit_impl(callback_message* msg); 
    7979        void run_impl(); 
     80        void join_impl(); 
    8081 
    8182private: 
    8283        blocking_vector<callback_message*> m_messages; 
    83         source<> m_source; 
    84         zone<source<> > m_zone; 
    85  
    86         typedef std::vector<callback_message*> messages_t; 
     84        zone<> m_zone; 
    8785 
    8886private: 
    8987        volatile sig_atomic_t m_end_flag; 
     88        typedef std::vector<pthread_base*> threads_t; 
     89        threads_t m_threads; 
    9090 
    9191private: 
     92        typedef std::vector<callback_message*> messages_t; 
    9293        void clear_message(messages_t& messages); 
    9394 
    9495private: 
    95         static scoped_ptr<manager> s_instance; 
     96        static std::auto_ptr<manager> s_instance; 
    9697        static manager& instance() { return *s_instance; } 
    9798 
     
    101102}; 
    102103 
    103  
    104 inline void initialize() 
    105         { manager::initialize(); } 
    106  
    107 inline void destroy() 
    108         { manager::destroy(); } 
    109104 
    110105inline bool is_end() 
     
    147142#include "mp/iothreads/writer.h" 
    148143#include "mp/iothreads/listen.h" 
     144//#include "mp/iothreads/connect.h" 
    149145#include "mp/iothreads/timer.h" 
    150146 
  • lang/c/mpio/trunk/mp/iothreads/connect.h

    r18476 r19043  
    1919#ifndef MP_IOTHREADS_LISTEN_H__ 
    2020#define MP_IOTHREADS_LISTEN_H__ 
     21 
     22#include <memory> 
    2123 
    2224#include <unistd.h> 
  • lang/c/mpio/trunk/mp/iothreads/listen.h

    r18476 r19043  
    2020#define MP_IOTHREADS_LISTEN_H__ 
    2121 
    22 #include <unistd.h> 
    23 #include <sys/types.h> 
    24 #include <sys/socket.h> 
    25 #include <errno.h> 
    26  
    2722namespace mp { 
    2823namespace iothreads { 
     
    3328        class listen_thread { 
    3429        public: 
    35                 listen_thread(int lsock, void (*callback)(void*, int fd), void* obj) : 
    36                         m_lsock(lsock), m_callback(callback), m_obj(obj) {} 
    37                 void operator() () 
    38                 { 
    39                         while(!iothreads::is_end()) { 
    40                                 int sock = ::accept(m_lsock, NULL, NULL); 
    41                                 if(sock < 0) { 
    42                                         if(errno == EINTR || errno == EAGAIN) { continue; } 
    43                                         // FIXME 
    44                                         throw std::runtime_error("socket broken"); 
    45                                 } else if(sock == 0) { 
    46                                         // FIXME 
    47                                         throw std::runtime_error("socket broken"); 
    48                                 } 
    49                                 iothreads::submit(m_callback, m_obj, sock); 
    50                         } 
    51                 } 
     30                listen_thread(int lsock, 
     31                                void (*callback)(void*, int fd), void* obj); 
     32                void operator() (); 
    5233        private: 
    5334                int m_lsock; 
  • lang/c/mpio/trunk/mp/iothreads/reader.h

    r18476 r19043  
    3737                        { return m_fd; } 
    3838                void send_data(const char* buf, size_t len) 
    39                         { return reader::send_data(m_fd, buf, len); } 
     39                        { return writer::send_data(m_fd, buf, len); } 
    4040        private: 
    4141                int m_fd; 
     
    7272MP_ARGS_END 
    7373 
    74         handler& data_impl(int fd); 
    75         void send_data_impl(int fd, const char* buf, size_t len); 
    7674        //void send_message_impl(int fd, callback_message* msg); 
    7775 
     
    9088                class cb_t { 
    9189                public: 
    92                         cb_t(handler* h, short ev); 
     90                        cb_t(handler* h); 
    9391                        ~cb_t(); 
    9492                public: 
    95                         short event() const; 
    96                         void event(short ev); 
    9793                        handler& get(); 
    9894                private: 
    9995                        handler* m_handler; 
    100                         short m_event; 
    10196                }; 
    10297 
     
    105100                ev_t m_ev; 
    106101 
    107                 fdnotify<> m_notify;  // FIXME 
     102                fdnotify<handler*> m_notify; 
    108103 
    109104        private: 
     
    118113        private: 
    119114                // inter-thread impl 
    120                 void msg_add(handler* h); 
    121                 void msg_send_message(int fd, callback_message* msg); 
     115                void add_impl(handler* h); 
     116                void send_message_impl(int fd, callback_message* msg); 
    122117        }; 
    123118 
     
    155150 
    156151 
    157 short reader::worker::cb_t::event() const 
    158 { 
    159         return m_event; 
    160 } 
    161  
    162 void reader::worker::cb_t::event(short ev) 
    163 { 
    164         m_event = ev; 
    165 } 
     152template <typename Handler> 
     153void reader::add(int fd) 
     154{ 
     155        instance().add_impl<Handler>(fd); 
     156} 
     157 
     158MP_ARGS_BEGIN 
     159template <typename Handler, MP_ARGS_TEMPLATE> 
     160void reader::add(int fd, MP_ARGS_PARAMS) 
     161{ 
     162        instance().add_impl<Handler>(fd, MP_ARGS_FUNC); 
     163} 
     164MP_ARGS_END 
     165 
     166 
     167template <typename Handler> 
     168void reader::add_impl(int fd) 
     169{ 
     170        handler* h = m_zone.allocate<Handler>(fd); 
     171        worker_of(fd).add(h); 
     172        h->connected(); 
     173} 
     174 
     175MP_ARGS_BEGIN 
     176template <typename Handler, MP_ARGS_TEMPLATE> 
     177void reader::add_impl(int fd, MP_ARGS_PARAMS) 
     178{ 
     179        handler* h = m_zone.allocate<Handler>(fd, MP_ARGS_FUNC); 
     180        worker_of(fd).add(h); 
     181        h->connected(); 
     182} 
     183MP_ARGS_END 
     184 
     185void reader::worker::add(handler* h) 
     186{ 
     187        m_notify.send(h); 
     188} 
     189 
     190handler& reader::worker::cb_t::cb_t(handler* h) : 
     191                m_handler(h) {} 
     192 
     193handler& reader::worker::cb_t::~cb_t() {} 
    166194 
    167195handler& reader::worker::cb_t::get() 
    168196{ 
    169197        return *m_handler; 
     198} 
     199 
     200 
     201void reader::worker::operator() () 
     202{ 
     203        cb_t* pcb; 
     204        int fd; 
     205        short event; 
     206        int ret; 
     207        while(!m_end_flag) { 
     208                while( m_ev.next(&fd, &event, &pcb) ) { 
     209                        if(fd == m_notify.getfd()) { 
     210                                handler* h; 
     211                                while(m_notify.try_receive(&h)) { 
     212                                        add_impl(h); 
     213                                } 
     214                        } else { 
     215                                try_read(*pcb); 
     216                        } 
     217                } 
     218                if( (ret = m_ev.wait()) < 0 ) { return ret; } 
     219        } 
     220        return 0; 
     221} 
     222 
     223void reader::worker::add_impl(handler* h) 
     224{ 
     225        m_ev.add(h->fd(), EV_READ, h); 
     226} 
     227 
     228void reader::worker::try_read(cb_t& cb) 
     229{ 
     230        try { 
     231                cb.get().read_event(); 
     232        } catch (...) { 
     233                // FIXME log 
     234                close_connection(cb); 
     235        } 
     236} 
     237 
     238void reader::worker::close_connection(cb_t& cb) 
     239{ 
     240        int fd = cb.get().fd(); 
     241        m_ev.remove(fd, EV_READ); 
     242        ::close(fd); 
     243        iothreads::submit( 
     244                        &object_callback<void (void)>::mem_fun<reader, &reader::remove>, 
     245                        &cb.get()); 
     246        // FIXME delete handler 
    170247} 
    171248 
  • lang/c/mpio/trunk/mp/iothreads/timer.h

    r18476 r19043  
    2020#define MP_IOTHREADS_TIMER_H__ 
    2121 
    22 #include <unistd.h> 
    23  
    2422namespace mp { 
    2523namespace iothreads { 
     
    3028        class timer_thread { 
    3129        public: 
    32                 timer_thread(double interval_sec, void (*callback)(void*), void* obj) : 
    33                         m_interval_usec(interval_sec * 1000 * 1000), 
    34                         m_callback(callback), 
    35                         m_obj(obj) {} 
    36                 void operator() () 
    37                 { 
    38                         while(!iothreads::is_end()) { 
    39                                 usleep(m_interval_usec); 
    40                                 iothreads::submit(m_callback, m_obj); 
    41                         } 
    42                 } 
     30                timer_thread(double interval_sec, 
     31                                void (*callback)(void*), void* obj); 
     32                void operator() (); 
    4333        private: 
    4434                useconds_t m_interval_usec; 
  • lang/c/mpio/trunk/mp/iothreads/writer.h

    r18476 r19043  
    2020#define MP_IOTHREADS_WRITER_H__ 
    2121 
    22 #include "mp/event.h" 
    23 #include "mp/fdnotify.h" 
    24 #include <sys/types.h> 
    25 #include <sys/uio.h> 
    26 #include <unistd.h> 
    27 #include <errno.h> 
     22#include <memory> 
    2823 
    2924namespace mp { 
     
    3530        static void initialize(unsigned int num_threads); 
    3631        static void destroy(); 
    37         ~writer(); 
    3832 
    3933public: 
    40         typedef void (*finalize_t)(void* user, int fd, const void* buf, size_t buflen); 
     34        typedef void (*finalize_t)(void* user, const void* buf, size_t buflen); 
    4135 
    4236        static void send_data(int fd, const void* buf, size_t buflen); 
     
    4842                        finalize_t finalize, void* user, bool thread_safe = false); 
    4943 
    50 private: 
    51  
    52         void send_data_impl(int fd, const void* buf, size_t buflen); 
    53         void send_data_impl(int fd, const void* buf, size_t buflen, finalize_t finalize, 
    54                         void* user, bool thread_safe = false); 
    55  
    56         void send_data_thread_impl(int fd, const void* buf, size_t buflen); 
    57         void send_data_thread_impl(int fd, const void* buf, size_t buflen, 
    58                         finalize_t finalize, void* user, bool thread_safe = false); 
     44public: 
     45        static void finalize_free(void* user, const void* buf, size_t buflen) 
     46                { free((void*)buf); } 
    5947 
    6048private: 
    61         class worker { 
    62         public: 
    63                 worker(); 
    64                 ~worker(); 
    65  
    66         public: 
    67                 void add(int fd, char* ebit, 
    68                                 const void* buf, size_t buflen, size_t off, 
    69                                 finalize_t finalize, void* user, 
    70                                 bool thread_safe); 
    71  
    72         public: 
    73                 void operator() (); 
    74  
    75         private: 
    76                 struct notify_entry { 
    77                         int fd; 
    78                         char* ebit; 
    79                         const void* buf; 
    80                         size_t buflen; 
    81                         size_t off; 
    82                         finalize_t finalize; 
    83                         void* user; 
    84                         bool thread_safe; 
    85                 }; 
    86  
    87                 fdnotify<notify_entry> m_notify; 
    88  
    89                 void add_impl(notify_entry& e); 
    90  
    91         private: 
    92                 class request { 
    93                 public: 
    94                         request(char* ebit, const void* buf, size_t buflen, 
    95                                         finalize_t finalize, void* user, 
    96                                         bool thread_safe); 
    97                         ~request(); 
    98  
    99                 public: 
    100                         void finalize(int fd); 
    101  
    102                 private: 
    103                         char* m_ebit; 
    104                         const void* m_buf; 
    105                         size_t m_buflen; 
    106                         finalize_t m_finalize; 
    107                         void* m_user; 
    108                         bool m_thread_safe; 
    109                 }; 
    110  
    111                 class context { 
    112                 public: 
    113                         context(); 
    114                         ~context(); 
    115  
    116                 public: 
    117                         bool empty(); 
    118                         void add(char* ebit, 
    119                                         const void* buf, size_t buflen, size_t off, 
    120                                         finalize_t finalize, void* user, 
    121                                         bool thread_safe); 
    122  
    123                 public: 
    124                         iovec* vec(); 
    125                         int veclen(); 
    126                         void consumed(int fd, int num); 
    127  
    128                 private: 
    129                         typedef std::vector<iovec> bufvec_t; 
    130                         bufvec_t m_bufvec; 
    131  
    132                         typedef std::vector<request> reqvec_t; 
    133                         reqvec_t m_reqvec; 
    134                 }; 
    135  
    136                 typedef event<context> ev_t; 
    137                 ev_t m_ev; 
    138         }; 
    139  
    140  
    141         class empty_checker { 
    142         public: 
    143                 empty_checker(); 
    144                 ~empty_checker(); 
    145         public: 
    146                 bool empty(); 
    147                 char* use(); 
    148         private: 
    149                 class chunk_t { 
    150                 public: 
    151                         chunk_t(); 
    152                 public: 
    153                         bool empty(); 
    154                         char* use(); 
    155                 private: 
    156                         char m_bit[sizeof(uint64_t)]; 
    157                         unsigned short m_used; 
    158                 }; 
    159                 typedef std::vector<chunk_t> bits_t; 
    160                 bits_t m_bits; 
    161         }; 
    162  
    163         typedef sparse_array<empty_checker> empty_check_t; 
    164         empty_check_t m_empty_check; 
    165  
    166         std::vector<worker*> m_workers; 
    167  
    168 private: 
    169         worker& worker_of(int fd); 
    170         void add_data_with_copy(int fd, const void* buf, size_t buflen); 
    171         void add_data_with_finalize(int fd, const void* buf, size_t buflen, size_t off, 
    172                         finalize_t finalize, void* user, bool thread_safe); 
    173  
    174 private: 
    175         writer(unsigned int num_threads); 
    176  
    177         static scoped_ptr<writer> s_instance; 
    178         static writer& instance(); 
     49        class impl; 
     50        static std::auto_ptr<impl> s_instance; 
     51        static impl& instance(); 
    17952 
    18053private: 
    18154        writer(); 
     55        ~writer(); 
    18256        writer(const writer&); 
    183  
    184 private: 
    185         static void finalize_free(void* user, int fd, const void* buf, size_t buflen) 
    186                 { free((void*)buf); } 
    18757}; 
    18858 
    18959 
    190 void writer::initialize(unsigned int num_threads) 
     60typedef writer::finalize_t finalize_t; 
     61 
     62inline void send_data(int fd, const void* buf, size_t buflen) 
    19163{ 
    192         s_instance.reset(new writer(num_threads)); 
     64        writer::send_data(fd, buf, buflen); 
    19365} 
    19466 
    195 void writer::destroy() 
     67inline void send_data(int fd, const void* buf, size_t buflen, finalize_t finalize, 
     68                void* user, bool thread_safe = false) 
    19669{ 
    197         s_instance.reset(NULL); 
     70        writer::send_data(fd, buf, buflen, finalize, user, thread_safe); 
    19871} 
    199  
    200 writer& writer::instance() 
    201 { 
    202         return *s_instance; 
    203 } 
    204  
    205  
    206 writer::writer(unsigned int num_threads) 
    207 { 
    208         m_workers.reserve(num_threads); 
    209         for(unsigned int i=0; i < num_threads; ++i) { 
    210                 worker* w = iothreads::manager::add_thread<worker>(); 
    211                 m_workers.push_back(w); 
    212         } 
    213 } 
    214  
    215 writer::~writer() {} 
    216  
    217  
    218 void writer::send_data(int fd, const void* buf, size_t buflen) 
    219 { 
    220         instance().send_data_impl(fd, buf, buflen); 
    221 } 
    222  
    223 void writer::send_data_impl(int fd, const void* buf, size_t buflen) 
    224 { 
    225         if(!m_empty_check.test(fd) || m_empty_check.data(fd).empty()) { 
    226                 ssize_t wl = ::write(fd, buf, buflen); 
    227                 if(wl < 0) { 
    228                         if(errno == EAGAIN || errno == EINTR) { 
    229                                 add_data_with_copy(fd, buf, buflen); 
    230                         } else { return; } 
    231                 } else if(wl == 0) { return; } 
    232                 if(static_cast<size_t>(wl) < buflen) { 
    233                         add_data_with_copy(fd, (const void*)(((const char*)buf)+wl), buflen-wl); 
    234                 } else { 
    235                         return; 
    236                 } 
    237         } else { 
    238                 add_data_with_copy(fd, buf, buflen); 
    239         } 
    240 } 
    241  
    242 void writer::add_data_with_copy(int fd, const void* buf, size_t buflen) 
    243 { 
    244         if(!m_empty_check.test(fd)) { m_empty_check.set(fd); } 
    245         void* nbuf = malloc(buflen); 
    246         if(nbuf == NULL) { throw std::bad_alloc(); } 
    247         memcpy(nbuf, buf, buflen); 
    248         worker_of(fd).add(fd, m_empty_check.data(fd).use(), 
    249                         buf, buflen, 0, 
    250                         &writer::finalize_free, NULL, 
    251                         true); 
    252 } 
    253  
    254 void writer::send_data(int fd, const void* buf, size_t buflen, finalize_t finalize, void* user, bool thread_safe) 
    255 { 
    256         instance().send_data_impl(fd, buf, buflen, 
    257                         finalize, user, thread_safe); 
    258 } 
    259  
    260 void writer::send_data_impl(int fd, const void* buf, size_t buflen, finalize_t finalize, void* user, bool thread_safe) 
    261 { 
    262         if(!m_empty_check.test(fd) || m_empty_check.data(fd).empty()) { 
    263                 ssize_t wl = ::write(fd, buf, buflen); 
    264                 if(wl < 0) { 
    265                         if(errno == EAGAIN || errno == EINTR) { 
    266                                 add_data_with_finalize(fd, buf, buflen, 0, finalize, user, thread_safe); 
    267                         } else { return; } 
    268                 } else if(wl == 0) { return; } 
    269                 if(static_cast<size_t>(wl) < buflen) { 
    270                         add_data_with_finalize(fd, buf, buflen, wl, finalize, user, thread_safe); 
    271                 } else { 
    272                         (*finalize)(user, fd, buf, buflen); 
    273                 } 
    274         } else { 
    275                 add_data_with_finalize(fd, buf, buflen, 0, finalize, user, thread_safe); 
    276         } 
    277 } 
    278  
    279 void writer::add_data_with_finalize(int fd, const void* buf, size_t buflen, size_t off, 
    280                 finalize_t finalize, void* user, bool thread_safe) 
    281 { 
    282         if(!m_empty_check.test(fd)) { m_empty_check.set(fd); } 
    283         worker_of(fd).add(fd, m_empty_check.data(fd).use(), 
    284                         buf, buflen, off, 
    285                         finalize, user, 
    286                         thread_safe); 
    287 } 
    288  
    289 writer::worker& writer::worker_of(int fd) 
    290 { 
    291         return *m_workers[fd % m_workers.size()]; 
    292 } 
    293  
    294  
    295 void writer::send_data_thread(int fd, const void* buf, size_t buflen) 
    296 { 
    297         instance().send_data_thread_impl(fd, buf, buflen); 
    298 } 
    299  
    300 void writer::send_data_thread_impl(int fd, const void* buf, size_t buflen) 
    301 { 
    302         add_data_with_copy(fd, buf, buflen); 
    303 } 
    304  
    305 void writer::send_data_thread(int fd, const void* buf, size_t buflen, 
    306                 finalize_t finalize, void* user, bool thread_safe) 
    307 { 
    308         instance().send_data_thread_impl(fd, buf, buflen, 
    309                         finalize, user, thread_safe); 
    310 } 
    311  
    312 void writer::send_data_thread_impl(int fd, const void* buf, size_t buflen, 
    313                 finalize_t finalize, void* user, bool thread_safe) 
    314 { 
    315         add_data_with_finalize(fd, buf, buflen, 0, finalize, user, thread_safe); 
    316 } 
    317  
    318  
    319 writer::empty_checker::empty_checker() 
    320 { 
    321         m_bits.resize(1); 
    322 } 
    323  
    324 writer::empty_checker::~empty_checker() { } 
    325  
    326  
    327 bool writer::empty_checker::empty() 
    328 { 
    329         for(bits_t::iterator it(m_bits.begin()), it_end(m_bits.end()); 
    330                         it != it_end; 
    331                         ++it) { 
    332                 if(!it->empty()) { 
    333                         return false; 
    334                 } 
    335         } 
    336         return true; 
    337 } 
    338  
    339 char* writer::empty_checker::use() 
    340 { 
    341         char* b = m_bits.back().use(); 
    342         if(b) { return b; } 
    343         m_bits.push_back(chunk_t()); 
    344         return m_bits.back().use(); 
    345 } 
    346  
    347 writer::empty_checker::chunk_t::chunk_t() : 
    348         m_used(0) 
    349 { 
    350         memset(m_bit, 0, sizeof(m_bit)); 
    351 } 
    352  
    353 bool writer::empty_checker::chunk_t::empty() 
    354 { 
    355         static const uint64_t ZERO = 0ULL; 
    356         return (memcmp(m_bit, (char*)&ZERO, sizeof(m_bit)) == 0); 
    357 } 
    358  
    359 char* writer::empty_checker::chunk_t::use() 
    360 { 
    361         if(m_used >= sizeof(m_bit)) { return NULL; } 
    362         char* b = &m_bit[m_used++]; 
    363         *b = 1; 
    364         return b; 
    365 } 
    366  
    367  
    368 writer::worker::request::request(char* ebit, const void* buf, size_t buflen, 
    369                 finalize_t finalize, void* user, 
    370                 bool thread_safe) : 
    371         m_ebit(ebit), m_buf(buf), m_buflen(buflen), 
    372         m_finalize(finalize), m_user(user), 
    373         m_thread_safe(thread_safe) 
    374 { } 
    375  
    376 writer::worker::request::~request() { } 
    377  
    378 void writer::worker::request::finalize(int fd) 
    379 { 
    380         *m_ebit = 0; 
    381         if(m_finalize) { 
    382                 if(m_thread_safe) { 
    383                         (*m_finalize)(m_user, fd, m_buf, m_buflen); 
    384                 } else { 
    385                         iothreads::submit(m_finalize, m_user, fd, m_buf, m_buflen); 
    386                 } 
    387         } 
    388 } 
    389  
    390  
    391 writer::worker::context::context() { } 
    392  
    393 writer::worker::context::~context() 
    394 { 
    395         if(!m_reqvec.empty()) { 
    396                 for(reqvec_t::iterator it(m_reqvec.begin()), it_end(m_reqvec.end()); 
    397                                 it != it_end; 
    398                                 ++it) { 
    399                         // FIXME 
    400                         //it->finalize(); 
    401                 } 
    402         } 
    403 } 
    404  
    405  
    406 inline void writer::worker::context::add(char* ebit, 
    407                 const void* buf, size_t buflen, size_t off, 
    408                 finalize_t finalize, void* user, 
    409                 bool thread_safe) 
    410 { 
    411         struct iovec vec = { ((char*)buf)+off, buflen-off }; 
    412         m_bufvec.push_back(vec); 
    413         try { 
    414                 m_reqvec.push_back(request(ebit, buf, buflen, finalize, user, thread_safe)); 
    415         } catch (...) { 
    416                 m_bufvec.pop_back(); 
    417                 // FIXME 
    418         } 
    419 } 
    420  
    421  
    422 iovec* writer::worker::context::vec() 
    423 { 
    424         return &m_bufvec[0]; 
    425 } 
    426  
    427 int writer::worker::context::veclen() 
    428 { 
    429         return m_bufvec.size(); 
    430 } 
    431  
    432 bool writer::worker::context::empty() 
    433 { 
    434         return m_bufvec.empty(); 
    435 } 
    436  
    437 inline void writer::worker::context::consumed(int fd, int num) 
    438 { 
    439         for(int i=0; i < num; ++i) { 
    440                 m_reqvec[i].finalize(fd); 
    441         } 
    442         m_bufvec.erase(m_bufvec.begin(), m_bufvec.begin()+num); 
    443         m_reqvec.erase(m_reqvec.begin(), m_reqvec.begin()+num); 
    444 } 
    445  
    446  
    447 writer::worker::worker() 
    448 { 
    449         m_ev.add(m_notify.getfd(), EV_READ); 
    450 } 
    451  
    452 writer::worker::~worker() {} 
    453  
    454  
    455 void writer::worker::add(int fd, char* ebit, 
    456                 const void* buf, size_t buflen, size_t off, 
    457                 finalize_t finalize, void* user, 
    458                 bool thread_safe) 
    459 { 
    460         notify_entry e = { 
    461                 fd, ebit, 
    462                 buf, buflen, off, 
    463                 finalize, user, thread_safe 
    464         }; 
    465         m_notify.send(e); 
    466 } 
    467  
    468 inline void writer::worker::add_impl(notify_entry& e) 
    469 { 
    470         if(!m_ev.test(e.fd)) { 
    471                 m_ev.add(e.fd, EV_WRITE); 
    472         } 
    473         m_ev.data(e.fd).add(e.ebit, e.buf, e.buflen, e.off, 
    474                         e.finalize, e.user, e.thread_safe); 
    475 } 
    476  
    477 void writer::worker::operator() () 
    478 { 
    479         int fd; 
    480         short event; 
    481         context* pctx; 
    482         while(!iothreads::is_end()) { 
    483                 while(m_ev.next(&fd, &event, &pctx)) { 
    484                         if(fd == m_notify.getfd()) { 
    485                                 notify_entry e; 
    486                                 while(m_notify.try_receive(&e)) { 
    487                                         add_impl(e); 
    488                                 } 
    489                         } else { 
    490                                 context& ctx(m_ev.data(fd)); 
    491                                 ssize_t wl = ::writev(fd, ctx.vec(), ctx.veclen()); 
    492                                 if(wl < 0) { 
    493                                         if(errno == EAGAIN || errno == EINTR) { 
    494                                                 continue; 
    495                                         } else { 
    496                                                 m_ev.remove(fd, EV_WRITE);  // FIXME 
    497                                                 continue; 
    498                                         } 
    499                                 } else if(wl == 0){ 
    500                                         m_ev.remove(fd, EV_WRITE);  // FIXME 
    501                                         continue; 
    502                                 } 
    503                                 int i; 
    504                                 for(i=0; i < ctx.veclen(); ++i) { 
    505                                         if(static_cast<size_t>(wl) >= ctx.vec()[i].iov_len) { 
    506                                                 wl -= ctx.vec()[i].iov_len; 
    507                                         } else { 
    508                                                 ctx.vec()[i].iov_base = (void*)(((char*)ctx.vec()[i].iov_base) + wl); 
    509                                                 ctx.vec()[i].iov_len -= wl; 
    510                                                 break; 
    511                                         } 
    512                                 } 
    513                                 if(i > 0) { 
    514                                         ctx.consumed(fd, i); 
    515                                         if(ctx.empty()) { 
    516                                                 m_ev.remove(fd, EV_WRITE); 
    517                                         } 
    518                                 } 
    519                                 //sleep(1); 
    520                                 //throw std::bad_alloc(); 
    521                         } 
    522                 } 
    523                 m_ev.wait();  // FIXME 
    524         } 
    525 } 
    526  
    527  
    528 // FIXME 
    529 scoped_ptr<writer> writer::s_instance; 
    53072 
    53173 
  • lang/c/mpio/trunk/mp/iothreads_impl.pre.h

    r18476 r19043  
    2525 
    2626 
    27 scoped_ptr<manager> manager::s_instance;  // FIXME 
    28  
    29 void manager::initialize() 
    30 { 
    31         s_instance.reset(new manager()); 
     27template <typename ThreadIMPL> 
     28ThreadIMPL* manager::add_thread() { 
     29        return instance().add_thread_impl<ThreadIMPL>(); 
    3230} 
    33  
    34 void manager::destroy() 
    35 { 
    36         s_instance.reset(NULL); 
    37 } 
    38  
    39  
    40 manager::manager() : 
    41         m_zone(m_source), 
    42         m_end_flag(0) 
    43 { } 
    44  
    45 manager::~manager() 
    46 { } 
    47  
    48  
    49 template <typename ThreadIMPL> 
    50 inline ThreadIMPL* manager::add_thread() { return instance().add_thread_impl<ThreadIMPL>(); } 
    5131 
    5232MP_ARGS_BEGIN 
    5333template <typename ThreadIMPL, MP_ARGS_TEMPLATE> 
    54 inline ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); } 
     34ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { 
     35        return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); 
     36} 
    5537MP_ARGS_END 
    5638 
     
    6042{ 
    6143        ThreadIMPL* impl = m_zone.allocate<ThreadIMPL>(); 
    62         m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 
     44        pthread_base* th = m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 
     45        m_threads.push_back(th); 
    6346        return impl; 
    6447} 
     
    6952{ 
    7053        ThreadIMPL* impl = m_zone.allocate<ThreadIMPL, MP_ARGS_TYPES>(MP_ARGS_FUNC); 
    71         m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 
     54        pthread_base* th = m_zone.allocate< pthread_thread<ThreadIMPL> >(impl); 
     55        m_threads.push_back(th); 
    7256        return impl; 
    7357} 
     
    7559 
    7660 
    77 inline void manager::submit(callback_message* msg) 
    78 { 
    79         instance().submit_impl(msg); 
    80 } 
    81  
    82 void manager::submit_impl(callback_message* msg) 
     61inline void manager::submit_impl(callback_message* msg) 
    8362{ 
    8463        try { 
     
    9069} 
    9170 
     71 
     72inline void manager::submit(callback_message* msg) 
     73{ 
     74        instance().submit_impl(msg); 
     75} 
     76 
    9277inline void manager::run() 
    9378{ 
    9479        instance().run_impl(); 
    9580} 
    96  
    97 void manager::run_impl() 
    98 { 
    99         messages_t cache; 
    100         while(!m_end_flag) { 
    101                 m_messages.swap(cache); 
    102                 for(messages_t::iterator it(cache.begin()), it_end(cache.end()); 
    103                                 it != it_end; 
    104                                 ++it) { 
    105                         callback_message* msg = *it; 
    106                         try { 
    107                                 (*msg)(); 
    108                         } catch (...) { 
    109                                 // FIXME 
    110                         } 
    111                 } 
    112                 clear_message(cache); 
    113         } 
    114 } 
    115  
    116 void manager::clear_message(messages_t& messages) 
    117 { 
    118         for(messages_t::iterator it(messages.begin()), it_end(messages.end()); 
    119                         it != it_end; 
    120                         ++it) { 
    121                 delete *it; 
    122         } 
    123         messages.clear(); 
    124 } 
    125  
    12681 
    12782inline void manager::end() 
     
    13691 
    13792 
     93inline void manager::join() 
     94{ 
     95        instance().join_impl(); 
     96} 
     97 
     98inline void manager::join_impl() 
     99{ 
     100        for(threads_t::iterator it(m_threads.begin()), it_end(m_threads.end()); 
     101                        it != it_end; 
     102                        ++it) { 
     103                (*it)->join(); 
     104        } 
     105} 
     106 
     107 
    138108}  // namespace iothreads 
    139109}  // namespace mp 
  • lang/c/mpio/trunk/mp/multiplex.pre.h

    r18607 r19043  
    9393        typedef event<cb_t> ev_t; 
    9494        ev_t m_ev; 
    95         source<> m_source; 
    96         zone<source<> > m_zone; 
     95        zone<> m_zone; 
    9796 
    9897private: 
  • lang/c/mpio/trunk/mp/multiplex_impl.pre.h

    r18476 r19043  
    139139void manager::add_impl(int fd) 
    140140{ 
     141        // FIXME sparse_array 
    141142        handler* h = m_zone.allocate<Handler>(fd); 
    142143        m_ev.add(fd, EV_READ, h, EV_READ); 
     
    148149void manager::add_impl(int fd, MP_ARGS_PARAMS) 
    149150{ 
     151        // FIXME sparse_array 
    150152        handler* h = m_zone.allocate<Handler>(fd, MP_ARGS_FUNC); 
    151153        m_ev.add(fd, EV_READ, h, EV_READ); 
     
    275277{ 
    276278        int fd = cb.get().fd(); 
     279        // FIXME remove handler form sparse_array 
    277280        m_ev.remove(fd, cb.event()); 
    278281        ::close(fd); 
  • lang/c/mpio/trunk/mp/sparse_array_impl.pre.h

    r18183 r19043  
    126126void* sparse_array<T>::set_impl(size_type index) 
    127127{ 
    128         while( base_array.size() * EXTEND_ARRAY_SIZE <= index ) { 
     128        while( base_array.size() <= index / EXTEND_ARRAY_SIZE ) { 
    129129                extend_array(); 
    130130        } 
  • lang/c/mpio/trunk/mp/utility.pre.h

    r18303 r19043  
    2020#define MP_UTILITY_H__ 
    2121 
     22#include <unistd.h> 
     23#include <fcntl.h> 
    2224#include <memory> 
    2325 
     
    3133 
    3234typedef thread_tag<0> main_thread_tag; 
     35 
     36 
     37inline void set_nonblock(int fd) 
     38{ 
     39        // FIXME error 
     40        fcntl(fd, F_SETFL, O_NONBLOCK); 
     41} 
    3342 
    3443