RubyではOSに依存しない(nativeでない)スレッドを提供しており、Threadクラスによりスレッドを扱うことができます。

プログラムの開始と同時に生成されるスレッドをメインスレッドと呼び、なんらかの理由でメインスレッドが終了する時に他の全てのスレッドも終了します。

スレッドの起動時に指定したブロックの実行が終了すると、そのスレッドの実行は終了します。

layout: page ## スレッドを生成する

Thread::new、Thread::start、Thread::forkメソッドにより新しくスレッドを生成することができます。

 puts "Test start"
 
 puts "Create thread"
 
 t = Thread.new do
   puts "Start thread"
   sleep 3
   puts "End thread"
 end
 
 puts "Waiting for the thread to complete"
 t.join
 
 puts "Test compleated"

上記の例は以下の処理を行っています。

  1. puts でメッセージを出力
  2. Thread::newでスレッドを生成
  3. 生成されたスレッドではメッセージを出力し、sleepで3秒停止する
  4. メインスレッドではThread#joinメソッドにより生成したスレッドが停止するのを待つ
  5. puts でメッセージ出力

このスクリプトを生成すると、メッセージを以下の順番で標準出力へ出力します。

 Test start
 Create thread
 Start thread
 Waiting for the thread to complete
 End thread
 Test compleated

スレッドに引数を渡す

引数を渡してスレッドを生成するにはスレッド生成のためのメソッドThread#new、Thread#start、Thread#forkに対して引数を指定します。引数はスレッドのためのブロックへそのまま渡されます。

 puts "Test start"
 
 puts "Create thread"
 
 t = Thread.new("Apple", 10) do |param1, param2|
   puts "Start thread - #{param1} #{param2}" #=> "Start thread - Apple 10"
   sleep 3
   puts "End thread"
 end
 
 puts "Waiting for the thread to complete"
 t.join
 
 puts "Test compleated"

スレッドの終了を待つ

生成したスレッドの終了を待つにはThread#joinメソッドを使用します。スクリプト例は、 スレッドを生成するを参照してください。

スレッドの実行を終了させる

Thread::killメソッドによりメインスレッドからスレッドの実行を終了させることができます。

 puts "Test start"
 
 puts "Create thread"
 
 t = Thread.new do
   puts "Start thread"
   sleep 10
   puts "End thread"
 end
 
 while line = gets
   if line.chop == "."
     Thread::kill(t)
     puts "the thread killed"
     break
   end
 end
 
 puts "Test compleated"

上記のスクリプトでは”.”だけをキーボードから入力すると生成したスレッドの実行を終了し、メッセージを出力した後、メインスレッドも終了します。出力されるメッセージは以下の通りです。

 Test start
 Create thread
 Start thread
 the thread killed
 Test compleated

スレッドを停止する

Thread::stopメソッドによりカレントスレッドの動作を停止することができます。停止したスレッドは他のスレッドからThread#runメソッドにより起動されるまで、停止しています。

 puts "Test start"
 
 puts "Create thread"
 
 t = Thread.new do
   puts "Start thread"
   Thread.stop
   puts "End thread"
 end
 
 gets
 t.run
 t.join
 
 puts "Test compleated"

上記の例では生成したスレッドは”Start thread”というメッセージを出力後、Thread#stopにより停止します。

メインスレッドではgetsにより標準入力からの入力待ちに入り、改行を入力するとThread#runにより停止したスレッドを再開させます。一連の動作により標準出力へ以下が出力されます。

 Test start
 Create thread
 Start thread
 End thread
 Test compleated

実行中のスレッド一覧を取得する

Thread::listメソッドにより実行中スレッドの一覧を配列で取得することができます。

以下の例はメインスレッドから生成した2つのスレッドを、 Thread::killメソッドにより停止させます。

Thread::listはメインスレッドも返却しますので、Thread::currentメソッドによりメインスレッドを取得し、それ以外にメソッドに対してのみThread::killメソッドを発行しています。

 puts "Test start"
 
 puts "Create thread 1"
 t1 = Thread.new do
   sleep 10
   puts "End thread 1"
 end
 
 puts "Create thread 2"
 t2 = Thread.new do
   sleep 10
   puts "End thread 2"
 end
 
 Thread::list.each {|t| Thread::kill(t) if t != Thread::current}
 
 p t1.join
 p t2.join
 
 puts "Test compleated"

このスクリプトを実行すると、以下の内容が標準出力へ出力されます。

 Test start
 Create thread 1
 Create thread 2
 #<Thread:0x2ac2c374 dead>
 #<Thread:0x2ac2c324 dead>
 Test compleated

スレッド間で通信する

Queueクラスを使用するとスレッド間で情報をやりとりすることができます。

 require "thread"
 
 q = Queue.new
 
 t = Thread.new do
   loop do
     n = q.pop
     if n.to_i>=0
       val = Math::sqrt(n)
       puts "Square(#{n}) = #{val}"
     else
       puts "?"
     end
   end
 end
 
 while line = gets
   if line.chop! == "."
     break
   else
     q.push(line)
   end
 end

上記の例はメインスレッドで標準入力から入力された数値をQueueに格納し、生成したスレッドでその平方根を求めて標準出力へ出力します。実行例を以下に示します。

 100
 Square(100) = 10.0
 20
 Square(20) = 4.472135955
 1000
 Square(1000) = 31.6227766
 2
 Square(2) = 1.414213562

例で使用したQueueクラスのメソッドの機能は以下の通りです。

Queue#empty?
キューが空の場合に真
Queue#push(x)
キューにxを追加
Queue#pop
キューからオブジェクトを取り出す

スレッド間の競合を回避する(Mutex)

スレッドを使って並列な処理を記述するとき、スレッド間で共通のリソース(メモリ・ファイルなど)にアクセスする場合にはスレッド間の競合に注意する必要があります。以下の例を見てください。

 ThreadNum = 10
 
 def countup
   counter = 0
   open("count.txt", "r") {|f| counter = f.read.to_i + 1}
   open("count.txt", "w") {|f| f.write counter}
 end
 
 open("count.txt", "w") { |f| f.write 0 }
 
 threads = []
 
 for i in 1..ThreadNum
   threads << Thread::start {countup}
 end
 
 for i in i..ThreadNum
   threads[i-1].join
 end
 
 open("count.txt", "r") {|f| puts f.read.to_i}

このスクリプトは以下の処理を行います。

  1. “count.txt” というファイルを書き込みモードでオープンし、0を書き込む
  2. 10個のスレッドを生成し、それぞれcountupというメソッドを実行する
  3. countupでは以下の処理を行う
  4. “count.txt”というファイルを読み込みモードでオープンし、読み込んだ値に1を加えて変数#counterに設定する
  5. “count.txt”というファイルを書き込みモードでオープンし、変数counterの値を書き込む
  6. Thread#joinで生成した全てのスレッドの終了を待つ
  7. “count.txt”というファイルを読み込みモードでオープンし、読み込んだ値を標準出力へ出力する

処理内容を見ると10個のスレッドを生成し、それぞれ”count.txt”というファイル中にある値に1を加えて保存するのですから、最終的には10という値が標準出力へ出力されそうなのですが、実際には10以下の値が出力されます。

理由としては、countupというメソッドが複数のスレッドで並列で実行されるためです。

例えばスレッド1がファイルを読み込んだ時に値が3だったとします。スレッド1が3に1を加えた値4をファイルへ保存する前に、スレッド2がファイルから値を読み出すと、スレッド1と同様に3という値を読み込んでしまうためです。

 ■スレッド1
 カウンタ値読込(3)---カウンタ値1加算(4)---カウンタ値保存(4)
 
 ■スレッド2
                             カウンタ値読込(3)---カウンタ値1加算(4)---カウンタ値保存(4)
 
 時間の流れ ---------------------------------------------------------------------------> 

このように、競合が発生する可能性のある処理はMutexクラスを使用することで、共通リソースへの並行アクセスによる問題を解消することができます。

Mutexクラスのメソッドを以下に示します。

lock
mutexオブジェクトをロックします。一度に一つのスレッドだけがmutexをロックすることができ、既にロックされているmutexに対してロックを行おうとしたスレッドは、mutexのロックが開放されるまで実行が停止されます。
locked?
mutexがロックされている時に真を返します。
synchronize
mutexをロックしてブロックを実行します。実行後にロックを開放します。
try_lock
mutexをロックしようとして成功した場合は真を返します。ロックできなかった場合にはブロックせず偽を返します。
unlock
mutexのロックを開放します。mutexのロック待ちになっていたスレッドの実行は再開されます。

以下はMutexクラスを使用して先ほどの例を書き直したものです。

 require "thread" # 追加
 
 ThreadNum = 10
 
 def countup
   counter = 0
   @locker.synchronize do # 追加
     open("count.txt", "r") {|f| counter = f.read.to_i + 1}
     open("count.txt", "w") {|f| f.write counter}
   end # 追加
 end
 
 open("count.txt", "w") { |f| f.write 0 }
 
 threads = []
 @locker = Mutex::new # 追加
 
 for i in 1..ThreadNum
   threads << Thread::start {countup}
 end
 
 for i in 0...ThreadNum
   threads[i].join
 end
 
 open("count.txt", "r") {|f| puts f.read.to_i}

このスクリプトでは10個のスレッドにより正しく値が更新されますので、10が標準出力へ出力されます。