淺談 Ruby 的 Fiber(六)
經過前面幾篇文章的介紹,我們已經初步的了解 Fiber 的性質。這系列的文章目標是利用 Fiber 實現再不透過 Thread 或者 Process 的情境,來實現支援多人連線的 TCP 聊天伺服器。
從這一篇開始,我們就要正式的來挑戰完整的實作了!
在開始之前,我們已經注意到前幾篇的程式碼已經開始有點複雜而且不好維護,所以我們要先做兩件事情來改善這個問題。
- 釐清功能
- 重構
功能分析
因為 Fiber 的特性,我們必須在所有遭遇到 Blocking I/O 的情境下轉為 Nonblocking I/O 來操作,也因此我們回來看一下前面幾篇需要處理 Blocking I/O 的情境。
- 接受連線的
#accept
行為 - 讀取使用者資料的
#gets
行為
為了能夠實現聊天室功能,我們至少還會需要再加入傳送資料給使用者的 #puts
行為。
而這些動作,我們都需要透過一個統一的物件來處理。
我們可以簡單的把他整理成類似像這樣的行為流程圖。
如果照我們原來的做法,會發現很難統一管理 Fiber 來在可以操作時執行對應的動作,所以上圖執行 Fiber.yield
的部分,我們會用一個物件來做統一管理,其他部分則可以先維持原樣。
重構
首先,我們先嘗試實現一個 Selector
來將可以讀取或者寫入的 I/O 物件找出來。
修改後的程式碼大致上會像這樣,我們提供了一個 #register
方法讓暫時無法讀取的物件被記錄下來。
1require 'socket'
2require 'fiber'
3
4# :nodoc:
5class Selector
6 def initialize
7 @fibers = {}
8 end
9
10 def register(io)
11 @fibers[io] = Fiber.current
12 Fiber.yield
13 end
14
15 def resume
16 readable, = IO.select(@fibers.keys)
17 readable.each do |io|
18 @fibers[io].resume
19 @fibers.delete(io)
20 end
21 end
22end
23
24selector = Selector.new
25server = TCPServer.new 3000
26
27loop do
28 begin
29 selector.resume
30
31 client = server.accept_nonblock
32 client.puts 'Hello World'
33
34 Fiber.new do
35 buffer ||= ''
36 begin
37 buffer << client.read_nonblock(1024)
38 puts buffer if buffer.include?("\n")
39 rescue IO::WaitReadable
40 selector.register(client)
41 end
42 end.resume
43 rescue IO::WaitReadable
44 sleep 1
45 retry
46 end
47end
不過這樣是無法正確執行的,因為 IO.select
行為是一個 Blocking I/O 的行為,不過我們可以將大量的 I/O 物件一次性的選取,只要有一個符合條件就可以解除。
而這段程式碼出問題的主因是,當開始後就會進入 IO.select
的阻塞狀態,但是伺服器的阻塞狀態並沒有被加入到其中管理,而造成無法正確運行。
因此,我們要將原本的程式碼再做出一些修正。
1Fiber.new do
2 loop do
3 begin
4 client = server.accept_nonblock
5 client.puts 'Hello World'
6
7 Fiber.new do
8 buffer ||= ''
9 begin
10 buffer << client.read_nonblock(1024)
11 puts buffer if buffer.include?("\n")
12 rescue IO::WaitReadable
13 selector.register(client)
14 retry
15 end
16 end.resume
17 rescue IO::WaitReadable
18 selector.register(server)
19 retry
20 end
21 end
22end.resume
23
24loop do
25 selector.resume
26end
不過修改之後,卻發現因為加入了 Fiber.new
給伺服器後,原本的 retry
和 loop
的角色似乎有點微妙,如果不使用 loop
的話,成功連線後就不會嘗試等待下一個新連線,而失敗的話不使用 retry
一樣也不會繼續嘗試處理新的連線,這樣整個工作分配變得有點混亂。
解析
要解決這樣的問題,最為理想的狀態是在 #accept_nonblock
的下一行馬上使用 Fiber.yield
以便 Fiber#resume
發生時能夠繼續還未完成的動作。
在 Ruby 裡面大部分的 Nonblocking I/O 方法都提供了 exception: false
的選項,讓我們達成這個條件。
小結
雖然開始嘗試重構,但是馬上又發現程式碼變的複雜,在下一篇我們會先嘗試採取 exception: false
的做法調整 Fiber 繼續執行的流程,然後再做一次重構讓程式碼恢復乾淨的狀態。