root/lang/objective-cplusplus/i3/trunk/src/mil/include/mil/Module.h @ 32286

Revision 32286, 3.9 kB (checked in by saturday06, 6 years ago)

fu

Line 
1#pragma once
2#include "Mil.h"
3#include "Thread.h"
4#include "ModuleExecuteProxy.h"
5
6namespace mil
7{
8
9/**
10 * public members should be thread safe
11 */
12template <typename Child>
13class Module : public Thread<Module<Child> >
14{
15public:
16    pool::Producer producer;
17    pool::Consumer consumer;
18private:
19    class EventBase : public boost::intrusive::slist_base_hook<>, public boost::noncopyable
20    {
21    public:
22        void (*execute)(void* event, void* target);
23        intptr_t owner_id;
24    };
25
26    class EventList : public boost::noncopyable
27    {
28        boost::intrusive::slist<EventBase, boost::intrusive::constant_time_size<false>, boost::intrusive::cache_last<true> > l;
29        boost::details::pool::default_mutex m;
30        typedef boost::details::pool::guard<boost::details::pool::default_mutex> guard;
31    public:
32        void push(EventBase& e)
33        {
34            guard g(m);
35            l.push_back(e);
36        }
37
38        void pop()
39        {
40            guard g(m);
41            l.pop_front();
42        }
43
44        EventBase* top()
45        {
46            guard g(m);
47            if (l.size() == 0)
48            {
49                return NULL;
50            }
51            return &l.front();
52        }
53    };
54
55    template <typename T>
56    class Event : public EventBase
57    {
58    public:
59        T data;
60    };
61
62    friend class Module_test;
63
64private:
65    EventList list;
66    Conditional c;
67    bool repost;
68    atomic<bool> breakLoopRequest;
69    atomic<bool> isWaiting;
70
71public:
72    Module() : repost(false), breakLoopRequest(false), isWaiting(false)
73    {
74    }
75
76    Child& getChild()
77    {
78        return *static_cast<Child*>(this);
79    }
80
81protected:
82    void repostEvent()
83    {
84        repost = true;
85    }
86
87
88    template <typename Data, typename Target>
89    static void dispatcher(void* event_, void* target_)
90    {
91        if (!event_ || !target_)
92        {
93            return;
94        }
95
96        // XXX type safety??
97        Target& target = *static_cast<Target*>(target_);
98        Event<Data>& event = *static_cast<Event<Data>*>(event_);
99        target.list.pop();
100        target.repost = false;
101        ModuleExecuteProxy<Target>::execute(target, event.data);
102        if (target.repost) {
103            target.list.push(event);
104            return;
105        }
106        pool::MemoryList* m = target.consumer.work((void*)&event, sizeof(event), event.owner_id);
107        if (!m) {
108            return;
109        }
110        pool::ReturnMemoryEvent e;
111        e.memory = m;
112        void* t = threads[event.owner_id].load();
113        if (!t) {
114            return;
115        }
116        (reinterpret_cast<Module<Child> * >(t))->post(e, target.producer);
117    }
118public:
119    template <typename T>
120    void post(const T& event, mil::pool::Producer& sender_producer)
121    {
122        void* p = sender_producer.malloc<sizeof(Event<T>)>();
123        if (p == NULL)
124        {
125            // XXX
126            return;
127        }
128
129        Event<T>* e = new(p) Event<T>;
130        e->data = event;
131        e->execute = dispatcher<T, Child>;
132        e->owner_id = this->thread_id;
133
134        list.push(*e);
135        if (isWaiting.load()) {
136            c.signal();
137        }
138    }
139public:
140    void destroy()
141    {
142        breakLoopRequest.store(true);
143    }
144    ~Module() {
145        destroy();
146    }
147    void execute_front()
148    {
149        EventBase* e = list.top();
150        if (e)
151        {
152            e->execute((void*)e, (void*)this);
153        }
154    }
155
156protected:
157    void loop()
158    {
159        while (!breakLoopRequest.load())
160        {
161            EventBase* e = list.top();
162            if (e)
163            {
164                e->execute((void*)e, (void*)this);
165            }
166            else
167            {
168                isWaiting.store(true);
169                c.wait();
170                isWaiting.store(false);
171            }
172        }
173        breakLoopRequest.store(false);
174    }
175};
176
177}
Note: See TracBrowser for help on using the browser.