root/lang/c/mpio/trunk/src/iothreads_reader.cc @ 22131

Revision 22131, 5.2 kB (checked in by frsyuki, 5 years ago)

lang/c/mpio: iothreads::reader call handler::~handler on worker thread

RevLine 
[19111]1//
2// mp::iothreads::reader
3//
4// Copyright (C) 2008 FURUHASHI Sadayuki
5//
6//    Licensed under the Apache License, Version 2.0 (the "License");
7//    you may not use this file except in compliance with the License.
8//    You may obtain a copy of the License at
9//
10//        http://www.apache.org/licenses/LICENSE-2.0
11//
12//    Unless required by applicable law or agreed to in writing, software
13//    distributed under the License is distributed on an "AS IS" BASIS,
14//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//    See the License for the specific language governing permissions and
16//    limitations under the License.
17//
18
19#include "mp/iothreads.h"
20#include "mp/fdnotify.h"
21#include "mp/event.h"
22#include <sys/types.h>
23#include <sys/uio.h>
24#include <unistd.h>
25#include <errno.h>
26
27namespace mp {
28namespace iothreads {
29
30
31class reader::impl::worker {
32public:
33        worker();
34        ~worker();
35
36public:
37        void add(handler* h);
[21142]38        void close(int fd);
[19504]39        void send_message(int fd, message_t* msg);
[19111]40
41public:
42        void operator() ();
43
44private:
[19504]45        typedef enum {
[21142]46                ADD_HANDLER,
[21965]47                REMOVE_HANDLER,
[19504]48                MESSAGE,
49        } notify_type_t;
[19111]50
[19504]51        struct notify_entry {
52                notify_type_t type;
53                union {
[21142]54                        handler* h;             // ADD_HANDLER
[21965]55                        int fd;                 // REMOVE_HANDLER
[21142]56                        struct {                // MESSAGE
[21965]57                                message_t* msg;
[19504]58                                int fd;
[21348]59                        } message;
[19504]60                } as;
61        };
62        fdnotify<notify_entry> m_notify;
[19111]63
[19504]64        void notify_impl(notify_entry& e);
65
[19111]66private:
[21965]67        typedef event<handler*> ev_t;
68        ev_t m_ev;
[21142]69
70private:
[21965]71        void try_read(handler* h);
72
73        void add_handler(handler* newh);
74        void remove_handler(handler* h);
75        void run_on_handler(handler* h, message_t* msg);
[19111]76};
77
78
79std::auto_ptr<reader::impl> reader::s_instance;
80
81void reader::initialize(unsigned int num_threads)
82{
83        s_instance.reset(new impl(num_threads));
84}
85
86void reader::destroy()
87{
88        s_instance.reset(NULL);
89}
90
91reader::impl& reader::instance()
92{
93        return *s_instance;
94}
95
96
97reader::impl::impl(unsigned int num_threads)
98{
99        m_workers.reserve(num_threads);
100        for(unsigned int i=0; i < num_threads; ++i) {
101                worker* w = iothreads::manager::add_thread<worker>();
102                m_workers.push_back(w);
103        }
104}
105
[21965]106reader::impl::~impl() { }  // FIXME handlers will leak
[19111]107
108
[21965]109void reader::impl::add(handler* newh)
110try {
111        worker_of(newh->fd()).add(newh);
112} catch (...) {
113        delete newh;
114        throw;
[19111]115}
116
[21142]117void reader::impl::close(int fd)
118{
119        worker_of(fd).close(fd);
120}
121
[21965]122void reader::impl::send_message(int fd, message_t* newmsg)
123try {
124        worker_of(fd).send_message(fd, newmsg);
125} catch (...) {
126        delete newmsg;
127        throw;
[19504]128}
129
[19111]130inline reader::impl::worker& reader::impl::worker_of(int fd)
131{
132        return *m_workers[fd % m_workers.size()];
133}
134
135
136reader::impl::worker::worker()
137{
[21965]138        if( m_ev.add(m_notify.getfd(), EV_READ, (handler*)NULL) < 0 ) {
[21348]139                throw event_error(errno, "iothreads reader failed to initialize event notifier");
140        }
[19111]141}
142
143reader::impl::worker::~worker() { }
144
145
146inline void reader::impl::worker::add(handler* h)
147{
[21142]148        notify_entry e = { ADD_HANDLER };
[19504]149        e.as.h = h;
150        m_notify.send(e);
[19111]151}
152
[21142]153inline void reader::impl::worker::close(int fd)
154{
[21965]155        notify_entry e = { REMOVE_HANDLER };
[21142]156        e.as.fd = fd;
157        m_notify.send(e);
158}
159
[19504]160inline void reader::impl::worker::send_message(int fd, message_t* msg)
161{
[19740]162        notify_entry e = { MESSAGE };
[21965]163        e.as.message.msg = msg;
[21348]164        e.as.message.fd = fd;
[21965]165        m_notify.send(e);
[19504]166}
167
[19111]168void reader::impl::worker::operator() ()
[19911]169try {
[19111]170        int fd;
171        short event;
172        while(!iothreads::is_end()) {
173                if(m_ev.wait(1000) < 0) {   // FIXME
174                        if(errno != EAGAIN && errno != EINTR) {
[21348]175                                throw event_error(errno, "iothreads reader event failed");
[19111]176                        }
177                }
178                while(m_ev.next(&fd, &event)) {
179                        if(fd == m_notify.getfd()) {
[19504]180                                notify_entry e;
181                                while(m_notify.try_receive(&e)) {
182                                        notify_impl(e);
[19111]183                                }
184                        } else {
[21965]185                                try_read(m_ev.data(fd));
[19111]186                        }
187                }
188        }
[19911]189} catch (...) {
190        if(!iothreads::is_end()) { throw; }
[19111]191}
192
193inline void reader::impl::worker::try_read(handler* h)
[21965]194try {
195        h->read_event();
196} catch (...) {
197        remove_handler(h);
[19111]198}
199
[21965]200
[19504]201inline void reader::impl::worker::notify_impl(notify_entry& e)
[19111]202{
[21142]203        if(e.type == ADD_HANDLER) {
[19504]204                handler* h = e.as.h;
[21142]205                add_handler(h);
[19504]206
[21965]207        } else if(e.type == REMOVE_HANDLER) {
[21142]208                int fd = e.as.fd;
209                if(!m_ev.test(fd)) { return; }
[21965]210                remove_handler(m_ev.data(fd));
[21142]211
212        } else {  // e.type == MESSAGE
[21348]213                int fd = e.as.message.fd;
[21965]214                std::auto_ptr<message_t> msg(e.as.message.msg);
[19740]215                if(!m_ev.test(fd)) { return; }
[21965]216                run_on_handler(m_ev.data(fd), msg.get());
[19111]217        }
218}
219
[21965]220namespace {
[22131]221static void close_fd(int fd) {
[21965]222        ::close(fd);
223}
224}
225
226inline void reader::impl::worker::add_handler(handler* newh)
227try {
228        if( m_ev.add(newh->fd(), EV_READ, newh) < 0 ) {
229                throw event_error(errno, "iothreads reader failed to add event");
[21142]230        }
[21965]231        // FIXME call newh->connected()?
232} catch (...) {
233        ::close(newh->fd());
234        delete newh;
235        // FIXME log?
[21142]236}
[19111]237
[21965]238void reader::impl::worker::remove_handler(handler* h)
[22131]239{
240        int fd = h->fd();
241        m_ev.remove(fd, EV_READ);
242        delete h;
[21965]243        //::close(fd);  // FIXME close(2)はロジックスレッドで
[21142]244        // 1. fdに対してイベント到着
245        // 2. 直後にclose
246        // 3. 直後にaccept, closeしたfdと同じ番号になり得る
247        // 4. イベントに対して返信するためにsend_data
248        // 5. 期待とは異なるfdに返信されてしまう
[22131]249        try {
250                iothreads::submit(close_fd, fd);
251        } catch (...) {
252                ::close(fd);
253        }
[21142]254}
255
[21965]256inline void reader::impl::worker::run_on_handler(handler* h, message_t* msg)
257try {
258        (*msg)(*h);
259} catch (...) {
260        // FIXME log?
261}
[21142]262
[21965]263
[19111]264}  // namespace iothreads
265}  // namespace mp
266
Note: See TracBrowser for help on using the browser.