| 1 | -module(qnesia). |
|---|
| 2 | -author('kuenishi@gmail.com'). |
|---|
| 3 | %% |
|---|
| 4 | %% qnesia |
|---|
| 5 | %% @brief Mnesia wrapper with Quorum-based replication |
|---|
| 6 | %% @author $Author$ |
|---|
| 7 | %% $Id$ |
|---|
| 8 | %% |
|---|
| 9 | |
|---|
| 10 | -record( data, {key, value}). |
|---|
| 11 | |
|---|
| 12 | -export([ start/1, stop/1, |
|---|
| 13 | write/2, read/1 ]). |
|---|
| 14 | |
|---|
| 15 | -export([ |
|---|
| 16 | mnesia_wrapper/0, coordinator/4, |
|---|
| 17 | start_test/0 |
|---|
| 18 | ]). |
|---|
| 19 | |
|---|
| 20 | % master process |
|---|
| 21 | % erl -sname a@localhost -s qnesia start_test |
|---|
| 22 | start_test()-> |
|---|
| 23 | qnesia:start([node(), 'b@localhost']). |
|---|
| 24 | |
|---|
| 25 | % write test : n> qnesia:write_test(). |
|---|
| 26 | % @spec write({Key, Value}) -> ok | ng |
|---|
| 27 | write(Key, Value)-> |
|---|
| 28 | Data = #data{key=Key, value=Value}, |
|---|
| 29 | coordinator ! {write, Data , self() }, |
|---|
| 30 | receive |
|---|
| 31 | Result-> |
|---|
| 32 | io:format("write test: ~p~n", [Result]) %とりあえず失敗する |
|---|
| 33 | end. |
|---|
| 34 | |
|---|
| 35 | % @spec read(Key) -> DataList |
|---|
| 36 | read(Key)-> |
|---|
| 37 | coordinator ! {read, Key, self()}, |
|---|
| 38 | receive |
|---|
| 39 | {ok, DataList}-> |
|---|
| 40 | io:format("read test: ~p~n", [DataList]), %とりあえず失敗する |
|---|
| 41 | DataList; |
|---|
| 42 | Other -> Other |
|---|
| 43 | end. |
|---|
| 44 | |
|---|
| 45 | start([])-> start( [node()] ); |
|---|
| 46 | start(Nodes)-> |
|---|
| 47 | N = erlang:length( Nodes ), |
|---|
| 48 | % start coordinator |
|---|
| 49 | Pid = spawn_link( ?MODULE, coordinator, [ N, N, N, Nodes ] ), |
|---|
| 50 | register( coordinator, Pid ), |
|---|
| 51 | % start mnesia_wrapper |
|---|
| 52 | spawn( ?MODULE, mnesia_wrapper, [] ), |
|---|
| 53 | Pid. |
|---|
| 54 | |
|---|
| 55 | stop(Pid) -> |
|---|
| 56 | Pid ! { stop, hand_command, self() }. |
|---|
| 57 | |
|---|
| 58 | %% @breif coordinator process dedicated to 1 transaction |
|---|
| 59 | %% @spec |
|---|
| 60 | coordinator(N, R, W, Members)-> |
|---|
| 61 | receive |
|---|
| 62 | {stop, _Reason, _From} -> |
|---|
| 63 | slave ! {self(), stop}; % send stop command to all members here. |
|---|
| 64 | {Req, Data, From}-> |
|---|
| 65 | ok = send_tx( Req, Data, Members ), |
|---|
| 66 | case Req of |
|---|
| 67 | write-> |
|---|
| 68 | Result = recv_ack( N, W, N-W+1 ), |
|---|
| 69 | From ! Result; |
|---|
| 70 | read-> |
|---|
| 71 | Result = recv_data( N, R, N-R+1, [] ), |
|---|
| 72 | From ! Result |
|---|
| 73 | end, |
|---|
| 74 | coordinator( N, R, W, Members ) |
|---|
| 75 | end. |
|---|
| 76 | |
|---|
| 77 | |
|---|
| 78 | send_tx(_Req, _Data, [])-> ok; |
|---|
| 79 | send_tx( Req, Data, [Receiver|Members])-> |
|---|
| 80 | {slave, Receiver} ! {self(), {Req, Data}}, |
|---|
| 81 | send_tx(Req, Data, Members). |
|---|
| 82 | |
|---|
| 83 | recv_ack(_N,_Q, 0)-> {error, quorum_nodes_failed}; |
|---|
| 84 | recv_ack(_N, 0, Remain)-> {ok, Remain}; |
|---|
| 85 | recv_ack( N, Q, Remain)-> |
|---|
| 86 | receive |
|---|
| 87 | {ok, Result}-> |
|---|
| 88 | io:format( "ok: ~p.~n", [Result]), |
|---|
| 89 | recv_ack(N, Q-1, Remain); |
|---|
| 90 | {error, Reason}-> |
|---|
| 91 | io:format( "failed: ~p.~n", [Reason]), |
|---|
| 92 | recv_ack(N, Q, Remain-1) |
|---|
| 93 | % after T-> |
|---|
| 94 | % {error, timeout} |
|---|
| 95 | end. |
|---|
| 96 | |
|---|
| 97 | recv_data(_N,_Q, 0, DataList)-> {error, DataList}; |
|---|
| 98 | recv_data(_N, 0,_Remain, DataList)-> {ok, DataList}; |
|---|
| 99 | recv_data( N, Q, Remain, DataList)-> |
|---|
| 100 | receive |
|---|
| 101 | {ok, Data}-> |
|---|
| 102 | io:format( "recv data: ~p.~n", [Data]), |
|---|
| 103 | recv_data(N, Q-1, Remain, [Data|DataList]); |
|---|
| 104 | {error, Reason} -> |
|---|
| 105 | io:format( "failed: ~p.~n", [Reason]), |
|---|
| 106 | recv_data(N, Q, Remain-1, DataList) |
|---|
| 107 | end. |
|---|
| 108 | |
|---|
| 109 | %% mnesia wrapper |
|---|
| 110 | %erl -sname b@localhost -s qnesia mnesia_wrapper -noshell -s init stop |
|---|
| 111 | mnesia_wrapper()-> |
|---|
| 112 | ok=mnesia:create_schema([node()]), |
|---|
| 113 | ok=mnesia:start(), |
|---|
| 114 | {atomic,ok}=mnesia:create_table( data, [ |
|---|
| 115 | {disc_copies, [node()]}, |
|---|
| 116 | {attributes, record_info(fields, data)} |
|---|
| 117 | ]), |
|---|
| 118 | true=register(slave, self()),%, slave), |
|---|
| 119 | mnesia_wrapper_loop(), |
|---|
| 120 | mnesia:stop(). |
|---|
| 121 | |
|---|
| 122 | mnesia_wrapper_loop()-> |
|---|
| 123 | io:format("entering ~p loop.~n", [?MODULE]), |
|---|
| 124 | receive |
|---|
| 125 | {From, stop} -> |
|---|
| 126 | io:format("stopping ~p loop.~n", [?MODULE]), |
|---|
| 127 | From ! {ok, stopped}; |
|---|
| 128 | {From, {write, Data}}-> |
|---|
| 129 | case mnesia:transaction(% Fun, Argv, 3 ) of |
|---|
| 130 | fun()-> |
|---|
| 131 | mnesia:write(Data) |
|---|
| 132 | end) of |
|---|
| 133 | {atomic, ok}-> |
|---|
| 134 | io:format("ok, atomic ~p.~n", [Data]), |
|---|
| 135 | From ! {ok, atomic}; |
|---|
| 136 | {aborted, Reason} -> |
|---|
| 137 | io:format("error ~p.~n", [Reason]), |
|---|
| 138 | From ! {error, Reason} |
|---|
| 139 | end, |
|---|
| 140 | mnesia_wrapper_loop(); |
|---|
| 141 | {From, {read, Key}}-> % when read, |
|---|
| 142 | case mnesia:dirty_read( data, Key ) of |
|---|
| 143 | []-> |
|---|
| 144 | io:format("error: no data for ~p.~n", [Key]), |
|---|
| 145 | From ! {error, no_data}; |
|---|
| 146 | [E]-> |
|---|
| 147 | io:format("ok, atomic ~p.~n", [E]), |
|---|
| 148 | From ! {ok, E}; |
|---|
| 149 | Other-> |
|---|
| 150 | io:format("error: too many data for ~p: ~p.~n", [Key, Other]), |
|---|
| 151 | From ! {error, too_many_data} |
|---|
| 152 | end, |
|---|
| 153 | mnesia_wrapper_loop(); |
|---|
| 154 | % {From, be_master}-> |
|---|
| 155 | {From, _Other} -> |
|---|
| 156 | io:format("error... illegal_message ~p.~n", [_Other]), |
|---|
| 157 | From ! {error, illegal_message}, |
|---|
| 158 | mnesia_wrapper_loop() |
|---|
| 159 | end. |
|---|