root/lang/c/mpio/trunk/mp/iothreads_impl.pre.h @ 19040

Revision 18476, 2.7 kB (checked in by frsyuki, 5 years ago)

lang/c/mpio: added mp::iothreads::reader and mp::iothreads::writer

Line 
1//
2// mp::iothreads
3//
4// Copyright (C) 2008 FURUHASHI Sadayuki
5//
6//    Licensed under the Apache License, Version 2.0 (the "License");
7//    you may not use this file except in compliance with the License.
8//    You may obtain a copy of the License at
9//
10//        http://www.apache.org/licenses/LICENSE-2.0
11//
12//    Unless required by applicable law or agreed to in writing, software
13//    distributed under the License is distributed on an "AS IS" BASIS,
14//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15//    See the License for the specific language governing permissions and
16//    limitations under the License.
17//
18
19#ifndef MP_IOTHREADS_IMPL_H__
20#define MP_IOTHREADS_IMPL_H__
21
22
23namespace mp {
24namespace iothreads {
25
26
27scoped_ptr<manager> manager::s_instance;  // FIXME
28
29void manager::initialize()
30{
31        s_instance.reset(new manager());
32}
33
34void manager::destroy()
35{
36        s_instance.reset(NULL);
37}
38
39
40manager::manager() :
41        m_zone(m_source),
42        m_end_flag(0)
43{ }
44
45manager::~manager()
46{ }
47
48
49template <typename ThreadIMPL>
50inline ThreadIMPL* manager::add_thread() { return instance().add_thread_impl<ThreadIMPL>(); }
51
52MP_ARGS_BEGIN
53template <typename ThreadIMPL, MP_ARGS_TEMPLATE>
54inline ThreadIMPL* manager::add_thread(MP_ARGS_PARAMS) { return instance().add_thread_impl<ThreadIMPL>(MP_ARGS_FUNC); }
55MP_ARGS_END
56
57
58template <typename ThreadIMPL>
59ThreadIMPL* manager::add_thread_impl()
60{
61        ThreadIMPL* impl = m_zone.allocate<ThreadIMPL>();
62        m_zone.allocate< pthread_thread<ThreadIMPL> >(impl);
63        return impl;
64}
65
66MP_ARGS_BEGIN
67template <typename ThreadIMPL, MP_ARGS_TEMPLATE>
68ThreadIMPL* manager::add_thread_impl(MP_ARGS_PARAMS)
69{
70        ThreadIMPL* impl = m_zone.allocate<ThreadIMPL, MP_ARGS_TYPES>(MP_ARGS_FUNC);
71        m_zone.allocate< pthread_thread<ThreadIMPL> >(impl);
72        return impl;
73}
74MP_ARGS_END
75
76
77inline void manager::submit(callback_message* msg)
78{
79        instance().submit_impl(msg);
80}
81
82void manager::submit_impl(callback_message* msg)
83{
84        try {
85                m_messages.push_back(msg);
86        } catch (...) {
87                delete msg;
88                throw;
89        }
90}
91
92inline void manager::run()
93{
94        instance().run_impl();
95}
96
97void manager::run_impl()
98{
99        messages_t cache;
100        while(!m_end_flag) {
101                m_messages.swap(cache);
102                for(messages_t::iterator it(cache.begin()), it_end(cache.end());
103                                it != it_end;
104                                ++it) {
105                        callback_message* msg = *it;
106                        try {
107                                (*msg)();
108                        } catch (...) {
109                                // FIXME
110                        }
111                }
112                clear_message(cache);
113        }
114}
115
116void manager::clear_message(messages_t& messages)
117{
118        for(messages_t::iterator it(messages.begin()), it_end(messages.end());
119                        it != it_end;
120                        ++it) {
121                delete *it;
122        }
123        messages.clear();
124}
125
126
127inline void manager::end()
128{
129        instance().m_end_flag = 1;
130}
131
132inline bool manager::is_end()
133{
134        return instance().m_end_flag == 1;
135}
136
137
138}  // namespace iothreads
139}  // namespace mp
140
141#endif /* mp/iothreads_impl.h */
142
Note: See TracBrowser for help on using the browser.