root/lang/ruby/nario/rskit/lib/ruby/1.8/rinda/ring.rb @ 17197

Revision 17197, 5.8 kB (checked in by authorNari, 5 years ago)

add rskit

Line 
1#
2# Note: Rinda::Ring API is unstable.
3#
4require 'drb/drb'
5require 'rinda/rinda'
6require 'thread'
7
8module Rinda
9
10  ##
11  # The default port Ring discovery will use.
12
13  Ring_PORT = 7647
14
15  ##
16  # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts.
17  # Service location uses the following steps:
18  #
19  # 1. A RingServer begins listening on the broadcast UDP address.
20  # 2. A RingFinger sends a UDP packet containing the DRb URI where it will
21  #    listen for a reply.
22  # 3. The RingServer recieves the UDP packet and connects back to the
23  #    provided DRb URI with the DRb service.
24
25  class RingServer
26
27    include DRbUndumped
28
29    ##
30    # Advertises +ts+ on the UDP broadcast address at +port+.
31
32    def initialize(ts, port=Ring_PORT)
33      @ts = ts
34      @soc = UDPSocket.open
35      @soc.bind('', port)
36      @w_service = write_service
37      @r_service = reply_service
38    end
39
40    ##
41    # Creates a thread that picks up UDP packets and passes them to do_write
42    # for decoding.
43
44    def write_service
45      Thread.new do
46        loop do
47          msg = @soc.recv(1024)
48          do_write(msg)
49        end
50      end
51    end
52 
53    ##
54    # Extracts the response URI from +msg+ and adds it to TupleSpace where it
55    # will be picked up by +reply_service+ for notification.
56
57    def do_write(msg)
58      Thread.new do
59        begin
60          tuple, sec = Marshal.load(msg)
61          @ts.write(tuple, sec)
62        rescue
63        end
64      end
65    end
66
67    ##
68    # Creates a thread that notifies waiting clients from the TupleSpace.
69
70    def reply_service
71      Thread.new do
72        loop do
73          do_reply
74        end
75      end
76    end
77   
78    ##
79    # Pulls lookup tuples out of the TupleSpace and sends their DRb object the
80    # address of the local TupleSpace.
81
82    def do_reply
83      tuple = @ts.take([:lookup_ring, DRbObject])
84      Thread.new { tuple[1].call(@ts) rescue nil}
85    rescue
86    end
87
88  end
89
90  ##
91  # RingFinger is used by RingServer clients to discover the RingServer's
92  # TupleSpace.  Typically, all a client needs to do is call
93  # RingFinger.primary to retrieve the remote TupleSpace, which it can then
94  # begin using.
95
96  class RingFinger
97
98    @@broadcast_list = ['<broadcast>', 'localhost']
99
100    @@finger = nil
101
102    ##
103    # Creates a singleton RingFinger and looks for a RingServer.  Returns the
104    # created RingFinger.
105
106    def self.finger
107      unless @@finger
108        @@finger = self.new
109        @@finger.lookup_ring_any
110      end
111      @@finger
112    end
113
114    ##
115    # Returns the first advertised TupleSpace.
116
117    def self.primary
118      finger.primary
119    end
120
121    ##
122    # Contains all discoverd TupleSpaces except for the primary.
123
124    def self.to_a
125      finger.to_a
126    end
127
128    ##
129    # The list of addresses where RingFinger will send query packets.
130
131    attr_accessor :broadcast_list
132
133    ##
134    # The port that RingFinger will send query packets to.
135
136    attr_accessor :port
137
138    ##
139    # Contain the first advertised TupleSpace after lookup_ring_any is called.
140
141    attr_accessor :primary
142
143    ##
144    # Creates a new RingFinger that will look for RingServers at +port+ on
145    # the addresses in +broadcast_list+.
146
147    def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
148      @broadcast_list = broadcast_list || ['localhost']
149      @port = port
150      @primary = nil
151      @rings = []
152    end
153
154    ##
155    # Contains all discovered TupleSpaces except for the primary.
156
157    def to_a
158      @rings
159    end
160
161    ##
162    # Iterates over all discovered TupleSpaces starting with the primary.
163
164    def each
165      lookup_ring_any unless @primary
166      return unless @primary
167      yield(@primary)
168      @rings.each { |x| yield(x) }
169    end
170
171    ##
172    # Looks up RingServers waiting +timeout+ seconds.  RingServers will be
173    # given +block+ as a callback, which will be called with the remote
174    # TupleSpace.
175
176    def lookup_ring(timeout=5, &block)
177      return lookup_ring_any(timeout) unless block_given?
178
179      msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
180      @broadcast_list.each do |it|
181        soc = UDPSocket.open
182        begin
183          soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
184          soc.send(msg, 0, it, @port)
185        rescue
186          nil
187        ensure
188          soc.close
189        end
190      end
191      sleep(timeout)
192    end
193
194    ##
195    # Returns the first found remote TupleSpace.  Any further recovered
196    # TupleSpaces can be found by calling +to_a+.
197
198    def lookup_ring_any(timeout=5)
199      queue = Queue.new
200
201      th = Thread.new do
202        self.lookup_ring(timeout) do |ts|
203          queue.push(ts)
204        end
205        queue.push(nil)
206        while it = queue.pop
207          @rings.push(it)
208        end
209      end
210     
211      @primary = queue.pop
212      raise('RingNotFound') if @primary.nil?
213      @primary
214    end
215
216  end
217
218  ##
219  # RingProvider uses a RingServer advertised TupleSpace as a name service.
220  # TupleSpace clients can register themselves with the remote TupleSpace and
221  # look up other provided services via the remote TupleSpace.
222  #
223  # Services are registered with a tuple of the format [:name, klass,
224  # DRbObject, description].
225
226  class RingProvider
227
228    ##
229    # Creates a RingProvider that will provide a +klass+ service running on
230    # +front+, with a +description+.  +renewer+ is optional.
231
232    def initialize(klass, front, desc, renewer = nil)
233      @tuple = [:name, klass, front, desc]
234      @renewer = renewer || Rinda::SimpleRenewer.new
235    end
236
237    ##
238    # Advertises this service on the primary remote TupleSpace.
239
240    def provide
241      ts = Rinda::RingFinger.primary
242      ts.write(@tuple, @renewer)
243    end
244
245  end
246
247end
248
249if __FILE__ == $0
250  DRb.start_service
251  case ARGV.shift
252  when 's'
253    require 'rinda/tuplespace'
254    ts = Rinda::TupleSpace.new
255    place = Rinda::RingServer.new(ts)
256    $stdin.gets
257  when 'w'
258    finger = Rinda::RingFinger.new(nil)
259    finger.lookup_ring do |ts|
260      p ts
261      ts.write([:hello, :world])
262    end
263  when 'r'
264    finger = Rinda::RingFinger.new(nil)
265    finger.lookup_ring do |ts|
266      p ts
267      p ts.take([nil, nil])
268    end
269  end
270end
271
Note: See TracBrowser for help on using the browser.