使用Ruby线程库(Thread)
使用Ruby线程库(Thread)
Thread是Ruby的线程库,Thread库已经内置在Ruby中,但如果想要使用线程安全的Queue、Mutex以及条件变量等,则需要手动require 'thread'
。
主线程main
默认情况下,每个Ruby进程都具备一个主线程main,如果没有创建新的线程,所有的代码都将在这个主线程分支中执行。
使用Thread.main()
类方法可获取当前线程组的主线程,使用Thread.current()
可以获取当前正在执行的线程分支。使用Thread.list()
可获取当前进程组中所有存活的线程。
1 | p Thread.main |
可见,线程其实是一个Thread类的实例对象。
创建Ruby线程
使用Thread库的new()、start()、fork()可创建线程,它们几乎等价,且后两者是别名关系。
如果主线程先执行完成,主线程将直接退出,主线程的退出将会终止进程,使得其它线程也会退出。
1 | Thread.new {puts "hello"} |
上述代码几乎总是会输出world
,然后退出,主线程的退出使得子线程不会输出"hello"
。之所以总是会输出world而不是输出hello,这和Ruby的线程调度有关,在后面的文章中会详细解释Ruby中的线程调度。
join()和value()等待线程
如果想要等待某个线程先执行完成,可使用t.join()
,如果线程t尚未退出,则join()会阻塞。可以在任意线程中调用t.join()
,谁调用谁等待。
1 | t = Thread.new { puts "I am Child" } |
还可以将多个线程对象放进数组,然后执行遍历join,另一种常见的做法是使用map{}.each(&:join)
的方式:
1 | threads = [] |
t.value()
和t.join()
类似,不同之处在于t.value()
在内部调用t.join()
等待线程t之后,还会在等待成功时取得该线程的返回值。
1 | a = Thread.new { 2 + 2 } |
注意,对于Ruby来说,无论是否执行join()操作,任务执行完成的线程都会马上被操作系统回收(从OS线程表中删除),但被回收的线程仍然能够使用value()
方法来获取被回收线程的返回值。之所以会这样,我个人猜想,也许是因为Ruby内部已经帮我们执行了join操作并将线程返回值保存在Ruby内部,这样对于用户来说就更加安全,而且用户执行join()或value()操作,可能是在等待Ruby内部的这个值的出现。
线程的异常处理
默认情况下,当某个非main线程中抛出异常后,该线程将因异常而终止,但是它的终止不会影响其它线程。
1 | t = Thread.new {raise "hello"} # 抛出异常 |
如果使用了t.join()
或t.value()
去等待抛出异常的线程t,异常将会传播给调用这两个方法的线程。例如主线程调用t.join
,如果t会抛出一次异常,那么主线程在等待过程中还会抛出一次异常。
1 | t = Thread.new {raise "hello"} # 抛出异常 |
如果想要让任意线程出现异常时终止整个程序,可设置类方法Thread.abort_on_exception
为true,它会在任意子线程抛出异常后自动传播给main线程,从而终止进程:
1 | Thread.abort_on_exception = true |
如果想要让某个特定的线程出现异常时终止整个程序,可设置同名的实例方法t.abort_on_exception
为true,只有t线程异常时才会终止程序。
1 | t1 = Thread.new { raise "Error from t1" } |
另外,线程实例方法t.raise()
可以直接在线程t抛出异常。
需注意,Ruby线程有一个巨大的缺点:无论是raise抛出异常还是各种终止(比如kill、exit),都不会执行ensure子句。
线程的状态和生命周期
Ruby中的线程具有5种状态,可通过t.status()
查看,该方法有5种对应的返回值:
1 | - run: 线程正在运行(running)或可运行(runnable) |
另外,还有两种统称状态:
- alive:存活的线程,等价于run + sleep
- stop:已停止的线程,等价于sleep + dead(false+nil)
可分别使用alive?()
和stop?()
来判断线程是否属于这两种统称状态。
此外:
1 | Kernel.sleep:让当前线程睡眠指定时长,无参数则永久睡眠,线程将进入睡眠队列 |
几个注意事项:
- 这里5个终止线程的方式效果上是完全等价的,三个实例方法是别名关系,而两个类方法的内部也都是调用线程对象的kill
- 最好要不加区分地看待run和wakeup
- 对于Thread.pass,除了知道它转让CPU的行为是确定的,不要对它假设任何额外的行为,比如不要认为出让CPU后一定会调度到其它Ruby线程,很有可能会在调度其它一些非Ruby线程后再次先调度到本线程而非其它Ruby线程
- 需注意,无论是raise抛出异常还是各种终止(比如kill、exit),都不会执行ensure子句
线程私有变量和局部变量
Ruby进程内的所有线程共享进程的虚拟地址空间,所以共享了一些数据。
但线程是语句块或者Proc对象,所以语句块内部创建的变量是在当前线程栈内部的,是每个线程私有的变量。
1 | # 主线程中的变量 |
Ruby为线程提供了局部变量共享的概念,每个线程对象都可以有自己的局部数据空间(即线程本地变量),线程对象的局部空间互不影响,比如两个线程中同时进行正则匹配,两个线程的$~
是不一样且互不影响的。
线程对象t
的局部数据空间是t[key]=value
,即一个名为t的hash结构,因为对象t是可以共享的,所以它的局部空间也是共享的。
1 | t1 = Thread.new do |
所以,有这么几个方法:
1 | t[key] |
此外还有一个fetch()方法,类似于Hash的fetch(),默认情况下访问不存在的key会异常,可指定默认值或通过语句块返回默认值。
严格来说,从Ruby 1.9出现Fiber之后,t[]
不再是线程本地变量(thread-local),而是纤程(Fiber)本地变量(fiber-local)。但也支持使用线程本地变量:
1 | t.thread_variables |
线程组
默认情况下,所有线程都在默认的线程组中,这个默认线程组是Ruby程序启动时创建的。可使用ThreadGroup::Default
获取默认线程组。
1 | t1 = Thread.new do |
- 使用
ThreadGroup.new
可创建一个自定义的线程组 - 使用
tg.add(t)
可将线程t加入线程组tg,这将会从原来的线程组移除t再加入新组tg - 使用
tg.list
可列出线程组tg中的所有线程 - 使用
t.group
可获取线程t所属的线程组 - 子线程会继承父线程的线程组,即子线程也会加入父线程所在的线程组
1 | tg = ThreadGroup.new |
线程组还有一个功能:可使用tg.enclose
封闭线程组tg,封闭后的线程组将不允许内部线程移出加入其它组,也不允许外界线程加入该组,只允许在该组中创建新线程。使用tg.enclosed?
测试线程组tg是否已封闭。
其实,使用线程组可以将多个线程分类统一管理,线程组本质是一个线程数组加一些额外属性。比如,可以为线程组定义一些额外的针对线程组中所有线程的功能:wakeup组中的所有线程、join所有线程、kill所有线程。
1 | class ThreadGroup |