Changeset 27802 for lang/erlang

Show
Ignore:
Timestamp:
01/03/09 04:31:13 (4 years ago)
Author:
kuenishi
Message:

lang/erlang/qnesia make almost done...

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • lang/erlang/qnesia/src/qnesia.erl

    r27794 r27802  
    2727        ]). 
    2828 
    29 % master process 
    30 % erl -sname a@localhost -s qnesia start_test 
    31 start_test()-> 
    32     qnesia:start([node(), 'b@localhost']). 
    3329 
    3430write(RemoteNode, {Key, Value})-> 
     
    6056    end. 
    6157 
     58% master process 
     59% erl -sname a@localhost -s qnesia start_test 
     60start_test()-> 
     61    qnesia:start([node(), 'b@localhost']). 
     62 
    6263start([])->    start( [node()] ); 
     64 
    6365start(Nodes)->  
    6466    N = erlang:length( Nodes ), 
     
    6769    register( coordinator, Pid ), 
    6870    % start mnesia_wrapper 
    69     spawn( ?MODULE, mnesia_wrapper, [] ), 
     71    spawn( ?MODULE, replica, [] ), 
    7072    Pid. 
    7173 
     
    8082                case Req of 
    8183                    write-> 
    82                         Result = recv_ack( N, W, N-W+1 ), 
     84                        Result = recv_ack( W, N-W+1 ), 
    8385                        From ! Result; 
    8486                    read-> 
     
    9597    end. 
    9698 
    97  
     99%% @breif  データを書き込んだ結果をFromに返信する。トランザクションのコーディネーター。 
     100%% @spec   tx_coordinator(N, W, Members, Data, From )-> {ok, transaction_done} | {error, Reason }     
     101tx_coordinator(N, W, Members, Data, From)-> 
     102    ok = send_tx( write, Data, Members ),                
     103    case recv_ack( W, N-W+1 ) of 
     104        {ok, Remain}-> 
     105            ok = send_tx( ok, do_commit, Members), 
     106            From ! {self(), {ok, transaction_done}}, 
     107            ok = recv_ack( N+Remain, 1 ), 
     108            {ok, transaction_done}; 
     109        {error, quorum_nodes_failed}-> 
     110            From ! {self(), {error, quorum_nodes_failed}}, 
     111            {error, quorum_nodes_failed}; 
     112        {error, Other} -> 
     113            io:format("unknown error: ~p~n", [Other]), 
     114            From ! {self(), {error, Other}}, 
     115            {error, Other} 
     116    end. 
     117 
     118%% @breif Membersのノード(のslaveというプロセス)全てにDataとリクエストを投げつける。  
     119%%        {Req, Data} = {write, Data}|{ok,do_commit} 
     120%% @spec  send_tx(Req, Data, Nodes)-> ok 
    98121send_tx(_Req, _Data, [])->      ok; 
    99122send_tx( Req,  Data, [Receiver|Members])-> 
    100     {slave, Receiver} ! {self(), {Req, Data}}, 
    101     send_tx(Req, Data, Members). 
    102  
    103 recv_ack(_N,_Q, 0)->      {error, quorum_nodes_failed}; 
    104 recv_ack(_N, 0, Remain)-> {ok, Remain}; 
    105 recv_ack( N, Q, Remain)-> 
    106     receive 
    107         {ok, Result}-> 
     123    try 
     124       {slave, Receiver} ! {self(), {Req, Data}} 
     125    catch 
     126        _->ok 
     127    after 
     128        send_tx(Req, Data, Members) 
     129    end. 
     130 
     131%% @breif Q個のackを待つ。Remainが0になったら諦める。 
     132%% @spec  recv_ack( Q, Remain ) -> {ok, Remain} | {error, quorum_nodes_failed} 
     133recv_ack(_Q, 0)->      {error, quorum_nodes_failed}; 
     134recv_ack( 0, Remain)-> {ok, Remain}; 
     135recv_ack( Q, Remain)-> 
     136    receive 
     137        {ok, _Result}-> 
    108138%           io:format( "ok: ~p.~n", [Result]), 
    109             recv_ack(N, Q-1, Remain); 
     139            recv_ack(Q-1, Remain); 
    110140        {error, Reason}-> 
    111141            io:format( "failed: ~p.~n", [Reason]), 
    112             recv_ack(N, Q, Remain-1) 
     142            recv_ack(Q, Remain-1) 
    113143%    after T-> 
    114144%           {error, timeout} 
     
    128158 
    129159replica()-> 
     160    replica(3, slave). 
     161 
     162%% spawn N workers... 
     163%% replica(N) -> ok 
     164replica(N, ProcessName) when N > 1 -> 
     165    true=register(ProcessName, self()), 
    130166    mnesia_wrapper_init(), 
    131     WriteTx = fun(Data)-> 
    132                       mnesia:transaction( 
    133                         fun()-> 
    134                                 mnesia:write(Data) 
    135                         end) end, 
    136     ReadTx = fun(Key)-> 
    137                      mnesia:transaction( 
    138                        fun()-> 
    139                                mnesia:read(data, Key) 
    140                        end) end, 
    141     mnesia_wrapper_loop(WriteTx, ReadTx), 
    142     mnesia:stop(). 
     167    io:format("entering ~p loop: process name is '~p'.~n", [?MODULE, ProcessName]), 
     168    io:format("if you want to stop: ~p ! { self(), stop }.~n", [ProcessName]), 
     169    Pids=spawn_mnesia_wrapper_loop( N ), 
     170    receive 
     171        {_From, stop}-> 
     172            stop_mnesia_wrapper_loop( Pids ), 
     173            mnesia:stop(), 
     174            true=unregister(ProcessName), 
     175            ok 
     176    end. 
    143177 
    144178%% mnesia wrapper 
    145179%erl -sname b@localhost  -s qnesia mnesia_wrapper  -noshell -s init stop 
    146180mnesia_wrapper_init()-> 
    147     ok=mnesia:create_schema([node()]), 
     181    case mnesia:create_schema([node()]) of 
     182        {error, Reason}-> 
     183            io:format( "create_schema: ~p~n", [Reason]); 
     184        ok -> ok; 
     185        _ -> ng 
     186    end, 
    148187    ok=mnesia:start(), 
    149     {atomic,ok}=mnesia:create_table( data, [ 
     188    case mnesia:create_table( data, [ 
    150189                                {disc_copies, [node()]}, 
    151                                 {attributes, record_info(fields, data)} 
    152                                 ]), 
    153     true=register(slave, self()),%, slave), 
    154     io:format("entering ~p loop.~n", [?MODULE]). 
     190                                {attributes, record_info(fields, data)}, 
     191                                {type, set} 
     192                               ]) of 
     193        {error, Reason2}-> 
     194            io:format( "create_table: ~p~n~p~n", [Reason2, mnesia:error_description(Reason2)]); 
     195        ok -> ok; 
     196        _ -> ng 
     197    end. 
     198 
     199spawn_mnesia_wrapper_loop( 0 )-> []; 
     200spawn_mnesia_wrapper_loop( N ) when N > 0 -> 
     201    Pid = spawn_link( fun()->mnesia_wrapper_loop_init() end ), 
     202    [Pid | spawn_mnesia_wrapper_loop( N-1 )]. 
     203 
     204stop_mnesia_wrapper_loop( [] )-> 
     205    {ok,done}; 
     206stop_mnesia_wrapper_loop( [Pid|Pids] ) -> 
     207    Pid ! {self(), stop}, 
     208    receive 
     209        {_From, {ok, stopped}}-> 
     210            stop_mnesia_wrapper_loop( Pids ); 
     211        {_From, _Other} -> %意味ねー 
     212            stop_mnesia_wrapper_loop( Pids ) 
     213    end. 
     214 
     215mnesia_wrapper_loop_init()-> 
     216    WriteTx = fun(Data)-> 
     217                      mnesia:transaction( fun()->mnesia:write(Data)end ) 
     218              end, 
     219    ReadTx  = fun(Key)-> 
     220                      mnesia:transaction( fun()->mnesia:read(data, Key)end ) 
     221              end, 
     222    mnesia_wrapper_loop(WriteTx, ReadTx). 
    155223 
    156224mnesia_wrapper_loop(WriteTx, ReadTx)-> 
     
    158226        {From, stop} -> 
    159227            io:format("stopping ~p loop.~n", [?MODULE]), 
    160             From ! {ok, stopped}; 
     228            From ! {self(), {ok, stopped}}; 
    161229        {From, {write, Data}}-> 
    162230            case mnesia:transaction(% Fun, Argv, 3 ) of 
     
    167235                                   From ! {self(), {ok, atomic}}, 
    168236                                   receive 
    169                                        {From, {error, do_rollback}}-> mnesia:abort(rollback_done); 
    170                                        {From, {ok, do_commit}}    -> ok; 
    171                                        _->      mnesia:abort(unknown_error) 
     237                                       {From, {error, do_rollback}}-> 
     238                                           mnesia:abort(rollback_done); 
     239                                       {From, {ok, do_commit}}->  
     240                                           ok; 
     241                                       _->      
     242                                           mnesia:abort(unknown_error) 
    172243                                   end; 
    173244                               {aborted, Reason} ->