Ruby 3多线程并行:Ractor
Ruby 3多线程并行:Ractor
Ruby 3 Ractor官方手册:https://github.com/ruby/ruby/blob/master/doc/ractor.md
在Ruby 3之前,使用Thread来创建新的线程,但这种方式创建的多线程是并发而非并行的,MRI有一个全局解释器锁GIL来控制同一时刻只能有一个线程在执行:
1 | # main Thread |
Ruby 3通过Ractor(Ruby Actor,Actor模型通过消息传递的方式来修改状态)支持真正的多线程并行,多个Ractor之间可并行独立运行。
1 | # main Ractor |
需注意,每个Ractor中至少有一个原生Ruby线程,但每个Ractor内部都拥有独立的GIL,使得Ractor内部在同一时刻最多只能有一个线程在运行。从这个角度来看,Ractor实际上是解释器线程,每个解释器线程拥有一个全局解释器锁。
如果main Ractor退出,则其他Ractor也会收到退出信号,就像main Thread退出时,其他Thread也会退出一样。
创建Ractor
使用Ractor.new
创建一个Ractor实例,创建实例时需指定一个语句块,该语句块中的代码会在该Ractor中运行。
1 | r = Ractor.new do |
可在new方法的参数上为该Ractor实例指定名称:
1 | r = Ractor.new(name: "ractor1") do |
new方法也可指定其他参数,这些参数必须在name参数之前,且这些参数将直接原样传递给语句块参数:
1 | arr = [11, 22, 33] |
关于new的参数,稍后还会有解释。
可使用Ractor.current
获取当前的Ractor实例,使用Ractor.count
获取当前存活的Ractor实例数量。
Ractor之间传递消息
Ractor传递消息的方式分两种:
- Push方式:向某个特定的Ractor实例推送消息,可使用
r.send(Msg)
或别名r << Msg
向该Ractor实例传送消息,并在该Ractor实例内部使用Ractor.receive
或别名Ractor.recv
或它们的同名私有方法来接收推送进来的消息- Ractor还提供了
Ractor.receive_if {expr}
方法,表示只在expr为true时才接收消息,receive
等价于receive_if {true}
- Ractor还提供了
- Pull方式:从某个特定的Ractor实例拉取消息,可在该Ractor实例内部使用
Ractor.yield
向外传送消息,并在需要的地方使用r.take
获取传输出来的消息Ractor.new
的语句块返回值,相当于Ractor.yield
,它也可被r.take
接收
因此,对于Push方式,要求知道消息传递的目标Ractor,对于Pull方式,要求知道消息的来源Ractor。
1 | # yield + take |
使用new方法创建Ractor实例时,可指定new的参数,这些参数会被原样传递给Ractor的语句块参数。
1 | arr = [11, 22, 33] |
实际上,new的参数等价于在Ractor语句块的开头使用了Ractor.receive
接收消息:
1 | r = Ractor.new 'ok' { |msg| msg } |
消息端口
Ractor之间传递消息时,实际上是通过Ractor的消息端口进行传递的。
每个Ractor都有自己的incoming port和outgoing port:
- incoming port:是该Ractor接收消息的端口,
r.send
和Ractor.receive
使用该端口- 每个incoming port都连接到一个大小不限的队列上
r.send
传入的消息都会写入该队列,由于该队列大小不限,因此r.send
从不阻塞Ractor.receive
从该队列弹出消息,当队列为空时,Ractor.receive
被阻塞直到新消息出现- 可使用
r.close_incoming
关闭incoming port,关闭该端口后,r.send
将直接报错,Ractor.receive
将先从队列中取数据,当队列为空后,再调用Ractor.receive
将报错
- outgoing port:是该Ractor向外传出消息的端口,
Ractor.yield
和r.take
使用该端口Ractor.yield
或Ractor语句块返回时,消息从outgoing port流出- 当没有
r.take
接收消息时,r内部的Ractor.yield
将被阻塞 - 当r内部没有
Ractor.yield
时,r.take
将被阻塞 Ractor.yield
从outgoing port传出的消息可被任意多个r.take
等待,但只有一个r.take
可获取到该消息- 可使用
r.close_outgoing
关闭outgoing port,关闭该端口后,再调用r.take
和Ractor.yield
将直接报错。如果r.take
正被阻塞(等待Ractor.yield
传出消息),关闭outgoing port操作将取消所有等待中的take并报错
Ractor.select等待消息
可使用Ractor.select(r1,r2,r3...)
等待一个或多个Ractor实例outgoing port上的消息(因此,select主要用于等待Ractor.yield
的消息),等待到第一个消息后立即返回。
Ractor.select
的返回值格式为[r, obj]
,其中:
- r表示等待到的那个Ractor实例
- obj表示接收到的消息对象
例如:
1 | r1 = Ractor.new{'r1'} |
通常来说,会使用Ractor.select
来轮询等待多个Ractor实例的消息,通用化的处理流程参考如下:
1 | # 充当管道功能的Ractor:接收消息并发送出去,并不断循环 |
此外,Ractor.select
除了可等待消息外,也可以用来yield传递消息,更多用法参考官方手册:Ractor.select。
Ractor并行时如何避免竞态
多个Ractor之间是可并行运行的,为了避免Ractor之间传递数据时出现竞态问题,Ractor采取了一些措施:
- 对于不可变对象,它们可直接在Ractor之间共享,此时传递它们的引用
- 对于可变对象,它们不可直接在Ractor之间共享,此时传递数据时,默认先按字节逐字节拷贝,然后后传递副本
- 也可以显式指定移动数据,将某份数据从Ractor1移动到另一个Ractor2中,即转移数据的所有权(参考Rust的所有权规则),转移所有权后,原始所有者Ractor中将无法再访问该数据
传递可共享对象:传递引用
可共享的对象:自动传递它们的引用,效率高
- 不可变对象可在Ractor之间直接共享(如Integer、symbol、true/false、nil),如:
i=123
:i是可共享的s="str".freeze
:s是可共享的h={c: Object}.freeze
:h是可共享的,因为Object是一个类对象,类对象是可共享的a=[1,[2],3].freeze
:a不可共享,因为冻结后仍然包含可变的[2]
- Class/Module对象,即类对象自身和模块对象自身是可共享的
- Ractor对象自身是可共享的
例如:
1 | i = 33 |
值得注意的是,Ractor对象是可共享的,因此可将某个Ractor实例传递给另一个Ractor实例。例如:
1 | pipe = Ractor.new do |
传递不可共享对象:传递副本
绝大多数对象不是可直接共享的。在Ractor之间传递不可共享的对象时,默认会传递deep-copy后的副本,即按字节拷贝的方式拷贝该对象的每一个字节。这种方式效率较低。
例如:
1 | arr = [11, 22, 33] # 数组是可变的,不可共享 |
从结果看,两个Ractor内的arr不是同一个对象。
需注意,对于全局唯一的对象来说(比如数值、nil、false、true、symbol),逐字节拷贝时并不会拷贝它们。例如:
1 | arr = %i[lang action sub] |
注意,Thread对象无法拷贝,因此无法在Ractor之间传递。
转移数据所有权
还可以让r.send(msg, move: true)
和Ractor.yield(msg, move: true)
传递数据时,明确表示要移动而非拷贝数据,即转移数据的所有权(从原来的所有者Ractor实例转移到目标Ractor实例)。
无论是可共享还是不可共享的对象,都可以转移所有权,只不过转移可共享对象的所有权没有意义,因为转移之后,原所有者仍然拥有所有权。
因此,通常只对不可共享的数据来转移所有权,转移所有权后,原所有者将无法访问该数据。
1 | str = "hello" |
值得注意的是,移动的本质是内存拷贝,它底层也一样是逐字节拷贝原始数据的过程,所以移动传递数据的效率和传递副本数据的效率是类似的。移动传递和传递副本的区别之处在于所有权,移动传递后,原所有者Ractor实例将无法访问该数据,而拷贝传递方式则允许原所有者访问。
注意,Thread对象无法转移所有权,因此无法在Ractor之间传递。
不可共享变成可共享:Ractor.make_shareable
对于不可共享的数据obj,可通过Ractor.make_shareable(obj)
方法将其转变为可共享的数据,默认转变的方式是逐层次地递归冻结obj。也可指定额外的参数Ractor.make_shareable(obj, copy: true)
,此时将深拷贝obj得其副本,再让副本(逐层递归冻结)转变为可共享数据。
例如:
1 | arr = %w[lang action sub] |
输出:
1 | 60 |
示例
工作者线程池:
1 | require 'prime' |
Pipeline:
1 | # pipeline with yield/take |