| 97 | | |
| | 99 | %% @breif データを書き込んだ結果をFromに返信する。トランザクションのコーディネーター。 |
| | 100 | %% @spec tx_coordinator(N, W, Members, Data, From )-> {ok, transaction_done} | {error, Reason } |
| | 101 | tx_coordinator(N, W, Members, Data, From)-> |
| | 102 | ok = send_tx( write, Data, Members ), |
| | 103 | case recv_ack( W, N-W+1 ) of |
| | 104 | {ok, Remain}-> |
| | 105 | ok = send_tx( ok, do_commit, Members), |
| | 106 | From ! {self(), {ok, transaction_done}}, |
| | 107 | ok = recv_ack( N+Remain, 1 ), |
| | 108 | {ok, transaction_done}; |
| | 109 | {error, quorum_nodes_failed}-> |
| | 110 | From ! {self(), {error, quorum_nodes_failed}}, |
| | 111 | {error, quorum_nodes_failed}; |
| | 112 | {error, Other} -> |
| | 113 | io:format("unknown error: ~p~n", [Other]), |
| | 114 | From ! {self(), {error, Other}}, |
| | 115 | {error, Other} |
| | 116 | end. |
| | 117 | |
| | 118 | %% @breif Membersのノード(のslaveというプロセス)全てにDataとリクエストを投げつける。 |
| | 119 | %% {Req, Data} = {write, Data}|{ok,do_commit} |
| | 120 | %% @spec send_tx(Req, Data, Nodes)-> ok |
| 100 | | {slave, Receiver} ! {self(), {Req, Data}}, |
| 101 | | send_tx(Req, Data, Members). |
| 102 | | |
| 103 | | recv_ack(_N,_Q, 0)-> {error, quorum_nodes_failed}; |
| 104 | | recv_ack(_N, 0, Remain)-> {ok, Remain}; |
| 105 | | recv_ack( N, Q, Remain)-> |
| 106 | | receive |
| 107 | | {ok, Result}-> |
| | 123 | try |
| | 124 | {slave, Receiver} ! {self(), {Req, Data}} |
| | 125 | catch |
| | 126 | _->ok |
| | 127 | after |
| | 128 | send_tx(Req, Data, Members) |
| | 129 | end. |
| | 130 | |
| | 131 | %% @breif Q個のackを待つ。Remainが0になったら諦める。 |
| | 132 | %% @spec recv_ack( Q, Remain ) -> {ok, Remain} | {error, quorum_nodes_failed} |
| | 133 | recv_ack(_Q, 0)-> {error, quorum_nodes_failed}; |
| | 134 | recv_ack( 0, Remain)-> {ok, Remain}; |
| | 135 | recv_ack( Q, Remain)-> |
| | 136 | receive |
| | 137 | {ok, _Result}-> |
| 131 | | WriteTx = fun(Data)-> |
| 132 | | mnesia:transaction( |
| 133 | | fun()-> |
| 134 | | mnesia:write(Data) |
| 135 | | end) end, |
| 136 | | ReadTx = fun(Key)-> |
| 137 | | mnesia:transaction( |
| 138 | | fun()-> |
| 139 | | mnesia:read(data, Key) |
| 140 | | end) end, |
| 141 | | mnesia_wrapper_loop(WriteTx, ReadTx), |
| 142 | | mnesia:stop(). |
| | 167 | io:format("entering ~p loop: process name is '~p'.~n", [?MODULE, ProcessName]), |
| | 168 | io:format("if you want to stop: ~p ! { self(), stop }.~n", [ProcessName]), |
| | 169 | Pids=spawn_mnesia_wrapper_loop( N ), |
| | 170 | receive |
| | 171 | {_From, stop}-> |
| | 172 | stop_mnesia_wrapper_loop( Pids ), |
| | 173 | mnesia:stop(), |
| | 174 | true=unregister(ProcessName), |
| | 175 | ok |
| | 176 | end. |
| 151 | | {attributes, record_info(fields, data)} |
| 152 | | ]), |
| 153 | | true=register(slave, self()),%, slave), |
| 154 | | io:format("entering ~p loop.~n", [?MODULE]). |
| | 190 | {attributes, record_info(fields, data)}, |
| | 191 | {type, set} |
| | 192 | ]) of |
| | 193 | {error, Reason2}-> |
| | 194 | io:format( "create_table: ~p~n~p~n", [Reason2, mnesia:error_description(Reason2)]); |
| | 195 | ok -> ok; |
| | 196 | _ -> ng |
| | 197 | end. |
| | 198 | |
| | 199 | spawn_mnesia_wrapper_loop( 0 )-> []; |
| | 200 | spawn_mnesia_wrapper_loop( N ) when N > 0 -> |
| | 201 | Pid = spawn_link( fun()->mnesia_wrapper_loop_init() end ), |
| | 202 | [Pid | spawn_mnesia_wrapper_loop( N-1 )]. |
| | 203 | |
| | 204 | stop_mnesia_wrapper_loop( [] )-> |
| | 205 | {ok,done}; |
| | 206 | stop_mnesia_wrapper_loop( [Pid|Pids] ) -> |
| | 207 | Pid ! {self(), stop}, |
| | 208 | receive |
| | 209 | {_From, {ok, stopped}}-> |
| | 210 | stop_mnesia_wrapper_loop( Pids ); |
| | 211 | {_From, _Other} -> %意味ねー |
| | 212 | stop_mnesia_wrapper_loop( Pids ) |
| | 213 | end. |
| | 214 | |
| | 215 | mnesia_wrapper_loop_init()-> |
| | 216 | WriteTx = fun(Data)-> |
| | 217 | mnesia:transaction( fun()->mnesia:write(Data)end ) |
| | 218 | end, |
| | 219 | ReadTx = fun(Key)-> |
| | 220 | mnesia:transaction( fun()->mnesia:read(data, Key)end ) |
| | 221 | end, |
| | 222 | mnesia_wrapper_loop(WriteTx, ReadTx). |
| 169 | | {From, {error, do_rollback}}-> mnesia:abort(rollback_done); |
| 170 | | {From, {ok, do_commit}} -> ok; |
| 171 | | _-> mnesia:abort(unknown_error) |
| | 237 | {From, {error, do_rollback}}-> |
| | 238 | mnesia:abort(rollback_done); |
| | 239 | {From, {ok, do_commit}}-> |
| | 240 | ok; |
| | 241 | _-> |
| | 242 | mnesia:abort(unknown_error) |