root/lang/erlang/qnesia.erl @ 27118

Revision 27118, 4.1 kB (checked in by kuenishi, 4 years ago)

read/write worked.

  • Property svn:keywords set to Id Author
Line 
1-module(qnesia).
2-author('kuenishi@gmail.com').
3%%
4%% qnesia
5%% @brief  Mnesia wrapper with Quorum-based replication
6%% @author $Author$
7%%  $Id$
8%%
9
10-record( data, {key, value}).
11
12-export([ start/1, stop/1,
13         write/2, read/1 ]).
14
15-export([
16         mnesia_wrapper/0, coordinator/4,
17         start_test/0
18        ]).
19
20% master process
21% erl -sname a@localhost -s qnesia start_test
22start_test()->
23    qnesia:start([node(), 'b@localhost']).
24
25% write test :  n> qnesia:write_test().
26% @spec   write({Key, Value}) -> ok | ng
27write(Key, Value)->
28    Data = #data{key=Key, value=Value},
29    coordinator ! {write, Data , self() },
30    receive
31        Result->
32            io:format("write test: ~p~n", [Result]) %とりあえず失敗する
33    end.
34
35% @spec   read(Key) -> DataList
36read(Key)->
37    coordinator ! {read, Key, self()},
38    receive
39        {ok, DataList}->
40            io:format("read test: ~p~n", [DataList]), %とりあえず失敗する
41            DataList;
42        Other -> Other
43    end.
44
45start([])->    start( [node()] );
46start(Nodes)->
47    N = erlang:length( Nodes ),
48    % start coordinator
49    Pid = spawn_link( ?MODULE, coordinator, [ N, N, N, Nodes ] ),
50    register( coordinator, Pid ),
51    % start mnesia_wrapper
52    spawn( ?MODULE, mnesia_wrapper, [] ),
53    Pid.
54
55stop(Pid) ->
56    Pid ! { stop, hand_command, self() }.
57
58%% @breif coordinator process dedicated to 1 transaction
59%% @spec
60coordinator(N, R, W, Members)->
61    receive
62        {stop, _Reason, _From} ->
63            slave ! {self(), stop}; % send stop command to all members here.
64        {Req,  Data, From}->
65            ok = send_tx( Req, Data, Members ),         
66            case Req of
67                write->
68                    Result = recv_ack( N, W, N-W+1 ),
69                    From ! Result;
70                read->
71                    Result = recv_data( N, R, N-R+1, [] ),
72                    From ! Result
73            end,
74            coordinator( N, R, W, Members )
75    end.
76
77
78send_tx(_Req, _Data, [])->      ok;
79send_tx( Req,  Data, [Receiver|Members])->
80    {slave, Receiver} ! {self(), {Req, Data}},
81    send_tx(Req, Data, Members).
82
83recv_ack(_N,_Q, 0)->      {error, quorum_nodes_failed};
84recv_ack(_N, 0, Remain)-> {ok, Remain};
85recv_ack( N, Q, Remain)->
86    receive
87        {ok, Result}->
88            io:format( "ok: ~p.~n", [Result]),
89            recv_ack(N, Q-1, Remain);
90        {error, Reason}->
91            io:format( "failed: ~p.~n", [Reason]),
92            recv_ack(N, Q, Remain-1)
93%    after T->
94%           {error, timeout}
95    end.
96
97recv_data(_N,_Q, 0,      DataList)-> {error, DataList};
98recv_data(_N, 0,_Remain, DataList)-> {ok, DataList};
99recv_data( N, Q, Remain, DataList)->
100    receive
101        {ok, Data}->
102            io:format( "recv data: ~p.~n", [Data]),
103            recv_data(N, Q-1, Remain, [Data|DataList]);
104        {error, Reason} ->
105            io:format( "failed: ~p.~n", [Reason]),
106            recv_data(N, Q, Remain-1, DataList)
107    end.
108
109%% mnesia wrapper
110%erl -sname b@localhost  -s qnesia mnesia_wrapper  -noshell -s init stop
111mnesia_wrapper()->
112    ok=mnesia:create_schema([node()]),
113    ok=mnesia:start(),
114    {atomic,ok}=mnesia:create_table( data, [
115                                {disc_copies, [node()]},
116                                {attributes, record_info(fields, data)}
117                                ]),
118    true=register(slave, self()),%, slave),
119    mnesia_wrapper_loop(),
120    mnesia:stop().
121
122mnesia_wrapper_loop()->
123    io:format("entering ~p loop.~n", [?MODULE]),
124    receive
125        {From, stop} ->
126            io:format("stopping ~p loop.~n", [?MODULE]),
127            From ! {ok, stopped};
128        {From, {write, Data}}->
129            case mnesia:transaction(% Fun, Argv, 3 ) of
130                   fun()->
131                           mnesia:write(Data)
132                   end) of
133                {atomic, ok}->
134                    io:format("ok, atomic ~p.~n", [Data]),
135                    From ! {ok, atomic};
136                {aborted, Reason} ->
137                    io:format("error ~p.~n", [Reason]),
138                    From ! {error, Reason}
139            end,
140            mnesia_wrapper_loop();
141        {From, {read, Key}}-> % when read,
142            case mnesia:dirty_read( data, Key ) of
143                []->
144                    io:format("error: no data for ~p.~n", [Key]),
145                    From ! {error, no_data};
146                [E]->
147                    io:format("ok, atomic ~p.~n", [E]),
148                    From ! {ok, E};
149                Other->
150                    io:format("error: too many data for ~p: ~p.~n", [Key, Other]),
151                    From ! {error, too_many_data}
152            end,
153            mnesia_wrapper_loop();
154%       {From, be_master}->
155        {From, _Other} ->
156            io:format("error... illegal_message ~p.~n", [_Other]),
157            From ! {error, illegal_message},
158            mnesia_wrapper_loop()
159    end.
Note: See TracBrowser for help on using the browser.