0%

word

第一个概念,word(字)。

word是cpu领域的一个重要概念,它被定义为cpu使用数据的一个自然单位(natural unit),cpu的很多数据长度都与其相关,比如:

  • 通常cpu的大多数寄存器长度为一个word。
  • 通常cpu最大寻址空间为一个word(即指针大小通常是一个word)。
  • 总线宽度通常为一个word,即cpu单次读写内存的量通常最大为一个word。
  • 很多cpu每条指令的长度也为一个word。

32位/64位cpu中的32位和64位就是指它的字长(word-length或word-size)为32位或64位。

这里不详细介绍不同cpu的word的具体含义。我们说一下word对内存读写的影响:

  • 早期的cpu通常只能沿着一个word的边界读写数据,如果一次读/写操作的目标地址不是字长的整数倍,cpu会报错。
  • 现代的X86 cpu可以在任意地址读写数据,但如果目标地址不是字长的整数倍,底层会将这次操作按word分界分成多个读写操作,对性能有明显影响。
  • X86-64 cpu可以在任意地址读写数据,且不会有明显的性能影响。

alignment

上图的第一个例子,我们按alignment存放了1个int(4字节数据),cpu只需要一次内存操作就可以完成存取。而第二个例子中,cpu需要两次内存操作来完成int的存取。

现代的cpu通常有多个字长概念(word、1/2word、1/4word等),针对不同的数据长度,可以有不同的字长。X86和X86-64对长度为1/2/4/8字节的数据,其字长也为1/2/4/8字节。

alignment和padding

第二个概念,alignment。

考虑到数据不按word边界存放可能引起的问题,编译器在排列变量时,会尽量将其按对应的字长来排列。这种行为就被称为对齐(alignment),而因为alignment导致的数据间产生未使用的空洞,则被称为填充(padding)。

1
2
3
4
5
6
7
void func() {
char c;
int16_t i16;
int32_t i32;
int64_t i64;
printf("c:%ld i16:%ld i32:%ld i64:%ld\n", &c, &i16, &i32, &i64);
}

在我的环境下(X86-64,gcc4.8.5),输出为:

1
c:140730763468703 i16:140730763468700 i32:140730763468696 i64:140730763468688

可以看到i16c之间有2字节的padding,i32i16之间有2字节的padding,i64i32之间有4字节的padding,实际上是这么排列的:

1
2
3
4
5
6
7
char c;
char padding0[2];
int16_t i16;
char padding1[2];
int32_t i32;
char padding2[4];
int64_t i64;

struct的alignment

struct的alignment规则很简单:

  • 空struct的size与alignment均为1。
  • 非空的struct,其alignment为各成员的alignment的最大值。其最后一个成员后面若有需要,也要padding。

注意第2条规则,会导致struct占用的空间比我们预期的更多,例子:

1
2
3
4
5
struct S {
char c0;
int64_t i64;
char c1;
};

它的自然大小为10B(1+8+1),但考虑到alignment的影响,真实大小却是24B(8+8+8)!

因此S的真实布局为:

1
2
3
4
5
6
7
struct S {
char c0;
char padding0[7];
int64_t i64;
char c1;
char padding1[7];
};

而当S作为其它struct的成员时,它的size和alignment分别是24和8,会影响到上层struct的alignment。

适当的重新排列S的成员,可以显著减小它的size:

1
2
3
4
5
struct S {
char c0;
char c1;
int64_t i64;
};

此时它的真实布局为:

1
2
3
4
5
6
struct S {
char c0;
char c1;
char padding[6];
int64_t i64;
};

S的size减小到了16B。

与alignment有关的编译器扩展

这里只介绍gcc的相关扩展。

gcc允许我们用__attribute__来修饰变量,其中用于改变alignment的有以下几种。

注意:修改alignment可能会影响ABI兼容性和可移植性,通常不推荐。

aligned

语法1:__attribute__ ((aligned (size_in_byte))),显式指定alignment。注意:指定比默认更小的alignment是无效的,会被编译器忽略。

1
2
3
4
5
6
7
8
9
int x __attribute__ ((aligned (16))) = 0;

struct S {
int x[2] __attribute__ ((aligned (8)));
};

struct R {
char c;
} __attribute__ ((aligned (8)));

语法2:__attribute__ ((aligned)),让编译器选择可能的最大alignment,对64位环境而言通常是8。

1
2
3
struct R {
char c;
} __attribute__ ((aligned));

packed

语法:__attribute__ ((packed)),表示该变量或struct选择可能的最小alignment,对64位环境而言通常是1。

下面这个struct,加上packed后其大小变为10,与其自然大小相等:

1
2
3
4
5
struct S {
char c0;
int64_t i64;
char c1;
};

编译选项

编译时如果加上-fpack-struct,则默认所有变量和struct都会按packed处理。

如果加上-Wpadded,则编译器增加padding的地方会有warning。注意只包含struct场景。

相关链接

前言

大体来讲,我们每个人头上都不会只有一项工作,很可能在某阶段,头上会有N项工作。我们当然希望这些工作能按部就班的、一项一项的顺序完成。但现实是残酷的,总有些工作会depend其它人,而不得不暂停下来;也总有些工作每天都有人催,需要尽快完成。所以,结论就是每个人都需要高并发工作,也需要知道怎么实现高并发工作。

有种观点是,编程中的所有概念都是人类活动的延续,反映了人类自己的思维方式与组织架构。这句话反过来说也不无道理,即人类的思维方式与组织架构,往往可以从编程中找到对应的概念。

某种程度上,实现高并发工作的方法与实现高并发编程是类似的。本文就参照高并发编程的一些要素,来分析一下如何达到高并发工作。

阅读全文 »

Future和Promise

异步编程

有没有某个时刻,你觉得你的程序可以分成多个部分,其中一些部分不需要等待其它部分运行结束?比如当程序发出一个http请求后,在它返回之前,程序似乎还可以做点别的事情;比如当程序在等待一个请求的response的序列化完成时,似乎它可以做下个请求的参数检查了。这时候,你就需要了解异步编程了。

当程序分成多部分,这些部分之间的消息通信就成了一件很重要的事情。通常我们将消息通信分成同步和异步两种,其中同步就是消息的发送方要等待消息返回才能继续处理其它事情,而异步就是消息的发送方不需要等待消息返回就可以处理其它事情。很显然异步允许我们同时做更多事情,往往也能获得更高的性能。尤其对于JavaScript这种通常是单线程环境的语言,更需要将长延时的阻塞操作异步化来保证其它操作的顺利进行。

异步编程的核心问题是如何处理通信:要么有办法知道通信有没有完成,要么能保证在通信完成后执行一段特定的逻辑。前者就是通知机制,比如信号量、条件变量等;后者就是callback,即回调。

回调噩梦

当一项任务需要分成多个异步阶段完成时,就需要在每个阶段的回调函数中加入下阶段回调的代码,最终产生下面这样金字塔形状的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
getData = function(param, callback){
$.get('http://example.com/get/'+param,
function(responseText){
callback(responseText);
});
}

getData(0, function(a){
getData(a, function(b){
getData(b, function(c){
getData(c, function(d){
getData(d, function(e){
// ...
});
});
});
});
});

可以想象当回调层次继续增加时,代码有多恐怖。这就是回调噩梦。

Future和Promise

Future指一个只读的值的容器,这个值可能立即可用,也可能在未来某个时间可用。而Promise则是一个只能写入一次的对象。每个Promise关联一个Future,对Promise的写入会令Future的值可用。我们只讨论Promise和Future一对一的场景,在这个场景中Future就是值,而Promise是产生值的方法。

Future和Promise来源于函数式语言,其目的是分离一个值和产生值的方法,从而简化异步代码的处理。

通知机制

Future与Promise配合起来可以实现一种可靠的通知机制,即我们可以异步执行一个方法,通过返回的Future来知道异步方法何时结束、是否成功、返回值是什么。

1
2
3
4
5
6
7
8
9
10
11
12
// 调用方
void SyncOperation() {
Promise<int> promise;
RunAsync(std::bind(AsyncFunc, promise));
Future<int> future = promise.GetFuture();
int result = future.Get(); // wait until future is done
}
// 接收方
void AsyncFunc(Promise<int> promise) {
// do something
promise.Done(result);
}

链式回调

Promise的一个重要特性就是它支持then,可以将金字塔式的回调组织为链式,极大地降低了理解和维护的难度:

1
2
3
4
5
6
7
8
9
10
11
12
13
getData = function(param, callback){
return new Promise(function(resolve, reject) {
$.get('http://example.com/get/'+param,
function(responseText){
resolve(responseText);
});
});
}

getData(0).then(getData)
.then(getData)
.then(getData)
.then(getData);

Async和Await

C#在5.0之后支持了asyncawait关键字,允许写出这样的代码:

1
2
3
4
5
6
7
8
9
10
async Task<int> AccessTheWebAsync()  
{
HttpClient client = new HttpClient();
Task<string> getStringTask = client.GetStringAsync("http://msdn.microsoft.com");
DoIndependentWork();
string urlContents = await getStringTask;
return urlContents.Length;
}

string urlContents = await client.GetStringAsync();

其中async要求函数必须返回TaskTask<T>,这里的Task可以理解为一种Future。用async修饰函数表明这是个可异步执行的函数,而用await会等待Future结束,返回Future的值,将异步又转成了同步。

上面js的例子用await来实现就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
getData = async function(param, callback){
return new Promise(function(resolve, reject) {
$.get('http://example.com/get/'+param,
function(responseText){
resolve(responseText);
});
});
}

var data = await getData(0);
var data1 = await getData(data);
var data2 = await getData(data1);
var data3 = await getData(data2);
var data4 = await getData(data3);

这种写法要比Promise链更接近同步,也更易懂,但其底层依然是Promise。这种写法很接近于协程:用Promise来实现yield和resume,它就是一种协程。

不同语言中的Future和Promise

C++

C++11中增加了std::futurestd::promise,基本是按照Future只读、Promise只写来设计的。它的缺点是:

  1. 其实现绑定了std::thread,很难扩展到其它执行器上。
  2. Promise不支持链式回调。

C#

C#的Task就类似于Future,它的asyncawait也很方便。

Java

Java之前就有Future,类似于C++11的std::promise,没有链式回调能力。Java8中增加了CompletableFuture,可以认为是一个完全的Promise了。

JavaScript

ES6开始,JavaScript增加了Promise、async、await等特性,极大改善了JS代码中维护回调难的问题。

Python

Python3.5之后增加了对asyncawait的支持。

Scala

Scala中的FuturePromise完全符合上面的介绍,它的一个特点是一个Future可以增加多个回调,但不保证这些回调的执行顺序。

相关链接

Persistent Data Structure

Persistent Data Structure,直译就是“持久性数据结构”。根据Wiki定义,如果一种数据结构,能保留它每一次被修改前的版本,就可以被称为“持久性数据结构”。而与之相对的就是Ephemeral Data Structure,即“暂存性数据结构”。

C++中,我们熟悉的数据结构大多数都属于暂存性数据结构,比如std::vector,当我们修改了一个std::vector,与之相关的所有已存在的Iterator就失效了,不能再访问了。而如果我们修改了一个持久性数据结构,我们仍然能够访问到它修改之前的版本,比如Iterator(假如有的话)不会失效,不会读到修改后的数据等。从这个角度讲,持久性数据结构也是不可变(Immutable)的。

什么地方需要用到持久性数据结构?

  1. 函数式编程语言。它的定义就要求了不能有可变数据和可变数据结构。
  2. 并发编程。

(另外,使用Persistent Map/HashMap有助于简化Prototype的实现,也算是一个用途。)

持久性数据结构对Lazy Evaluation也很有帮助:如果一个数据结构是可变的,我们肯定不会放心对它使用Lazy Evaluation。

Persistent Data Structure与并发

并发编程的核心问题是竞争,即不同线程同时访问相同数据。对此我们往往需要用到一些同步手段来保证临界区的原子性,即同时只有一个线程能读出或写入相同的共享数据。

最常见的同步手段就是锁,但锁也会引起一大堆的问题:

  • 死锁。
  • 活锁。
  • 临界区太大导致性能低下。
  • 错误地在临界区外访问数据导致数据竞争。
  • 控制反转。

另一种同步手段基于原子操作,从而实现出一套lock-free的数据结构。它的问题在于:

  • 原子操作本质上也是锁(总线锁),因此高并发度时开销还是会很大。
  • 需要非常精细的实现一个lock-free的数据结构,维护难度大,且很难证明其正确性。一些久经考验的数据结构仍然可能存在bug。

而当我们在不同线程间访问相同的持久性数据结构时,我们很清楚其中不会有任何的数据竞争,因为无论其他线程如何修改这个结构,当前线程看到的结构永远是不变的。这不需要异常复杂的实现和同样复杂的测试来保证。

可以认为通过锁和原子操作实现的并发数据结构追求的是“没有明显的错误”,而持久性数据结构则是“明显没有错误”。

而从性能角度,持久性数据结构也并非一定处于劣势。实现良好的持久性数据结构,通常都可以提供一种或多种操作,其时间和空间复杂度与对应的暂存性数据结构相同。且编译器针对持久性数据结构的不变性,往往能给出更优化的目标代码。

如何实现Persistent Data Structure

想要实现一种持久性数据结构,最简单的方法就是“Copy Anything”,即每当我们修改原结构时,实际上我们都创建了一个副本,再在副本上修改。

当然这种方法的缺点也是很明显的:开销太大。

第二种方法是修改时不创建数据的副本,而是保存每次对结构的修改操作,当需要读取的时候再创建数据的副本,再在其上应用每个操作。显然这种方法在读取时的开销非常大。一种改进方案是每K个修改操作创建一次数据副本。

第三种方法是路径复制(Path Copy),即对于基于Node的数据结构,当我们进行修改时,我们会复制路径上经过的Node,直到最终修改发生的Node。这里我们用“构造”代替了“修改”。

目前最常用的实现方法就是路径复制。

垃圾回收与引用计数

持久性数据结构的一个核心思想是为当前每个持有的人保留一个版本,即对于相同的数据可能同时存在多个版本。这样我们就需要有垃圾回收机制,对于每个版本的数据,在没有人持有之后回收掉。

对于C++而言,通常引用计数就足够了。因为持久化数据结构有一个特点:它的引用链一定是无回路的,只有新对象引用老对象,不可能有老对象引用新对象,因此单纯的引用计数就可以完全回收掉所有垃圾数据。

同时,这个特点对Java类的分代GC也是很有好处的。

有位老同志指出:对于非特定的基于引用计数的数据结构,不能使用std::shared_ptr,原因是它的析构是链式递归进行的,C++的编译器不一定能去掉所有的尾递归,可能会打爆栈。一种解法是从Root节点开始,收集后面所有的节点,然后循环析构。

为什么std::map可以递归析构?因为平衡树的析构链长度为lgn,假设系统的栈深度上限为100,需要std::map中有2^100个元素以上才会栈溢出,此时内存早就爆掉了。

Persistent List

Persistent List可能是最简单的持久性数据结构。它支持两种基本操作:构造、插入头节点,即:

1
2
3
4
5
6
7
template <typename T>
class List {
public:
List();
List(T val, List tail);
...
};

这里我们使用的实现方法就是路径复制:当我们插入或删除或修改List的第N个节点时,我们需要复制前N-1个节点。

因此,插入或删除头节点都是O(1)的开销,而任意位置插入或删除节点则是O(n)的开销。

基本操作

以下是List的一个最简单实现,包括了支持的几个O(1)操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename T>
class List {
struct Item {
Item(const T& v, const Item* tail) : mValue(v), mNext(tail) {}
T mValue;
const Item* mNext;
};
explicit List(const Item* item): mHead(item) {}
public:
List(): mHead(nullptr) {}
List(const T& val, List tail): mHead(new Item(val, tail.mHead)) {}

bool Empty() const {
return mHead == nullptr;
}

const T& Front() const {
assert(mHead);
return *mHead;
}

List PushFront(const T& val) const {
return List(val, mHead);
}

List PopFront() const {
return List(mHead->mNext);
}
private:
const Item* mHead;
};

可以看到这个实现中还没有任何引用计数有关的代码。我们有两种方式来实现引用计数:

  1. 使用std::shared_ptr,这样只需要把每个使用const Item*的地方都换成std::shared_ptr<const Item>即可。
  2. 自己实现一种侵入式的引用计数基类Reference,并令Item继承自它。

其它列表操作

当我们拥有了一个持久性的List后,我们就可以在其上实现一套函数式的操作:

  • fmap
  • filter
  • foldl/foldr
  • forEach

实现戳这里

Persistent Map

Persistent Map的实现也是基于路径复制的。算法导论上有这么一道习题,清晰的体现了它的实现思路:

Persistent Map

当我们插入或删除一个节点时,路径上的每个节点都需要被复制一次,因此这样的操作的开销是O(lgn)的。

插入操作是很简单的:

1
2
3
4
5
6
7
8
9
10
11
Node* Insert(Node* root, const T& val) {
if (!root) {
return new Node(val, nullptr, nullptr);
} else if (val < root->mValue) {
return new Node(root->mValue, Insert(root->mLeft, val), root->mRight);
} else if (val > root->mValue) {
return new Node(root->mValue, root->mLeft, Insert(root->mRight, val));
} else {
return root;
}
}

删除操作有些麻烦,需要处理删除的节点有子节点的情况。原则就是:用构造代替所有的修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Node* Delete(Node* root, const T& val) {
if (!root) {
return nullptr;
} else if (val < root->mValue) {
return new Node(root->mValue, Delete(root->mLeft, val), root->mRight);
} else if (val > root->mValue) {
return new Node(root->mValue, root->mLeft, Delete(root->mRight, val));
} else {
if (!root->mLeft) {
return root->mRight;
} else if (!root->mRight) {
return root->mLeft;
} else {
Node* leftMost;
tie(leftMost, root->mRight) = RemoveLeftMost(root->mRight);
return new Node(leftMost->mValue, root->mLeft, root->mRight);
}
}
}

注:以上代码均未考虑垃圾回收。

对于红黑树而言,每次它平衡时,同样要复制涉及到的节点。每次平衡最多涉及3个节点,因此平衡的开销是O(1)。

实现戳这里。不过这个实现没有删除功能。

Persistent Vector

Persistent Vector的基本思想与Persistent Map非常接近:你把Persistent Vector想象成以下标为Key的Map即可:

Basic Persistent Vector

图中就是一个这样的Map,它有如下特点:

  1. 分为内部节点和叶节点两类,其中数据都在叶节点上。
  2. 所有叶节点的高度相同。
  3. 每个内部节点有两个子节点。
  4. 每层节点中,只有最后一个节点有可能是半满的,其它节点都是满的。

当然这样的实现效率是很低的,每次Update、PushBack的时间复杂度是O(lgn)。因此实践上我们会使用类似于Trie树的结构,即每个节点能容纳多于两个子节点,通常为32个,这样一个6层的Map就可以容纳最多1073741824个元素,每次Update、PushBack最多需要复制6个节点,可以认为变成了一次O(1)操作。

查找:

1
2
3
4
5
6
7
8
const T& Get(Node* root, int32_t index) {
while (!root->IsLeaf()) {
const int32_t mask = root->Mask();
root = root->Get(index / mask);
index %= mask;
}
return *(const T*)root->Get(index);
}

保留Tail节点来提高PushBack性能

想想一个Persistent Vector执行PushBack的时候都发生了什么?从Root开始,一直复制到Tail叶节点。原因是当我们复制了一个子节点,我们就需要修改它的父节点,根据“修改即构造”原则,就需要重新构造一个父节点,从而一直到Root都要构造一遍。

那么,假如Persistent Vector直接持有Tail节点呢?这样当它未满时,PushBack只要复制两个节点:Tail节点和Vector本身。当Tail节点已满时,我们才需要真正做一次PushBack。对于32个子节点的Persistent Vector来说,有31/32的PushBack是真正的O(1)操作,其它1/32的PushBack才需要O(lgn)。

Tail

新的查找也很简单。为了配合查找,我们要记录下Tail节点的offset:

1
2
3
4
5
if lookup_index < tail_offset:
# tree_lookup = old lookup technique
return tree_lookup(lookup_index)
else:
return tail[lookup_index - tail_offset]

Transient

当我们要对Persistent Vector进行一系列修改时,每次Update/PushBack/PopBack都要复制lgn节点,即使有了Tail节点优化,也只能对PushBack和PopBack有一定的效果。问题在于,我们即使每个操作都复制了一个新节点,下次操作这个节点时还是要再复制,因为我们要保证不破坏其它人的使用。那么什么时候可以直接重用这个节点,不用复制呢?

  1. 只能被当前Vector访问到的节点。
  2. 当前Vector在这次操作后不会被其它人使用。

对于条件1,很好解决:我们每次修改Vector时都申请一个UUID,并放到这次修改创建的节点中,这样通过UUID就能判断节点是不是当前Vector创建出来的。

对于条件2,我们没有办法阻止用户在所有修改完成之前使用这个Vector。因此我们使用一个新类型Transient来进行批量修改。好处:显式的让用户知道,我们要做一些不一样的事情啦,在它结束之前不要访问这个Vector。

在批量修改结束之后,还需要一个操作来把Transient变为Persistent,这步操作会把每个节点中的ID置为NULL,保证合法的Vector的节点没有ID,从而避免一个Transient被误用。

Persistent HashMap

当我们有了一个Persistent Vector的实现之后,实现一个Persistent HashMap也就不困难了。

一个HashMap是什么?如果我们使用分桶链表来实现HashMap,它就是一个Vector,其中每个元素是一个List。那么我们可以用Persistent Vector + Persistent List来实现Persistent HashMap。如果我们使用开放散列法来实现HashMap,它就是单纯的一个Vector,只要用Persistent Vector来实现就可以了。

Persistent Data Structure的一个使用场景

想象我们有一个UserMap,保存每个UserID和对应的NickName。有两种操作:

  • NewUser:增加一对新的UserID和NickName。
  • Update:修改已有UserID对应的NickName。

现在我们用一个Persistent HashMap作为UserMap。有若干个前台线程可以访问和修改UserMap。

最简单的做法:

  1. 前台线程获得当前UserMap,称为M0。
  2. 前台线程修改UserMap,得到M1。
  3. 前台线程将M0替换为M1,原有的M0析构。

只有一个前台线程时,这种用法是没什么问题的。但当我们有多个前台线程时,就会有问题:

  1. 线程T1获得M0。
  2. 线程T2获得M0。
  3. 线程T1修改M0,得到M1。
  4. 线程T2修改M0,得到M2。
  5. 线程T1将M0替换为M1。
  6. 线程T2将M1替换为M2。

结果就是丢了一次数据!实际上这就是一个很经典的RMW操作,先Read,再本地Modify,再Write写回。对于RMW操作,必不可少的操作就是CompareAndExchange,也就是在Write时比较一下原对象有没有在你的Read之后被修改过。如果有的话,需要重试整个RMW操作。

在我们这个场景中,我们需要做的就是每个线程在替换UserMap时确认一下UserMap的最后修改时间是否与自己手上持有的M0的修改时间相同,如果相同才能完成替换,否则就整个操作重来。这种先确认数据是否冲突再写入的操作,就是一种很典型的Transaction。因此我们可以使用STM(Software Transactional Memory)来更规范的使用UserMap:

  1. 线程创建Transaction t。
  2. 线程通过t获得M0。
  3. 线程通过t修改M0,得到M1。
  4. 线程通过t替换UserMap为M1。

这个过程中,如果有数据冲突,根据STM的实现不同,可能在不同的地方失败,只有当没有数据冲突时,整个操作才能顺利走下去。

可以看到,当我们使用Persistent Data Structure时,数据冲突的概率决定了它的使用是否高效。上面的例子中,对于全局唯一的UserMap,如果有大量的修改操作同时进行,那么其中只会有非常少量的操作能成功,其它操作都会因为数据冲突而失败。那么我们在使用时,就要考虑能否减少UserMap的粒度,从而降低冲突概率,提高性能。比如,我们将UserMap分成4096个桶,每个桶是一个Persistent HashMap,那么冲突概率就会小很多,整体性能就上去了。

当然,在分桶后,如果有涉及多个UserID的操作,我们就需要一次性原子的替换多个UserMap。这不是一件容易的事情,幸好,STM就是干这个的。这也说明Persistent Data Structure和STM确实是好朋友。

相关链接

Type Erasure,直译就是“类型擦除”。什么时候需要擦除类型?当我们想令一些代码具备多态性质时,我们往往没办法保留对象本身的类型,而需要用一种通用的类型去使用它们,这个时候,就需要擦除对象原有的类型。

Type Erasure的几种形式

void*

在C语言中,很多通用算法函数都会使用void*作为参数类型,比如qsort,它的原型是:

1
void qsort (void* base, size_t num, size_t size, int (*compare)(const void*,const void*));

为了使qsort有处理多种类型的能力,它只能把参数类型设为void*,这样我们可以用同一个qsort函数,处理各种各样的类型。代价就是对象原有的类型被擦除了,我们只能看到void*

这种方法的缺点是,它不能保证类型安全。当我们擦除了一个对象的类型后,总会在某个时刻需要把它再找回来的。在qsort中,我们总是需要能拿到对象的正确类型的,才能进行正确的排序。而这个工作是通过compare完成的:

1
2
3
4
5
6
7
int int_compare(const void* a, const void* b) {
return *(const int*)a - *(const int*)b;
}

int str_compare(const void* a, const void* b) {
return strcmp((const char*)a, (const char*)b);
}

假设我们传递了错误的compare,谁能知道这件事?编译器不知道,因为你把类型擦除掉了。你自己也不知道,因为代码就是你写的。测试程序可能知道,也可能不知道,因为这个时候程序的行为是未定义的。

继承

在面向对象语言中,继承是最常见的Type Erasure。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
interface Counter {
public void Increase(int v);
public void Decrease(int v);
public int Value();
};
...
public class Test {
public static void down(Counter c) {
int count = 0;
int oldValue = c.Value();
while (c.Value() != 0) {
c.Decrease(1);
count++;
}
Assert.assertEqual(count, oldValue);
}
};

Test.down中,我们只知道c的类型是Counter,但不知道它是哪个实现类型,这里它的类型就被擦除了。

继承当然是比void*要好的,因为我们操作对象时调用的是对象具体的实现API,换句话说,我们只擦除了调用处对象的类型,实际上它并没有丢掉自己的类型,也保证了类型安全性。

继承的问题在于,它的侵入性,即它要求每个实现类型都继承自某个基类。在很多情况下,这是很难做到的,或者是很别扭的。

比如说RedApple,一个红色的苹果,当我们想使用“红色”这个泛型概念时,它需要实现Red这个接口;而当我们想使用“苹果”这个概念时,它又需要实现Apple这个接口。某天当我们想使用“类球形”这个概念时,它又要实现RoundLike接口吗?

当接口一个又一个的出现时,有人会说,干脆我们到处传递Object吧,用的时候再down_cast成具体的类型。于是我们又回到了void*的时代。

尤其是,有些类型我们是没有办法改的,比如三方库中定义的类型,比如内置类型。这些情况下,继承就无能为力了。

Duck Typing和Template

如果一个东西,走路像鸭子,叫声也像鸭子,那么它就是鸭子。换句话说,如果一个东西,满足我们对鸭子的所有要求,那么它就是鸭子。如果一个T,满足我们对X的所有要求,那么它就是X。这就是duck typing,即鸭子类型。

Python中大量应用了duck typing:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class RedApple:

def color(self):
return 'red'

def round_like(self):
return True

def map_by_color(items):
ret = defaultdict(list)
for item in items:
ret[item.color()].append(item)
return ret

color_map = map_by_color([..., RedApple(), ...])

map_by_color中,我们对items有两项要求:

  1. 可遍历。
  2. 其中每个元素都有color方法。

但不要求items或其中每个item继承自哪个特定的接口。

这也是Type Erasure,但明显比继承来得更自由。当然自由都是有代价的,duck typing的代价就是它的运行时性能损失。Python中每个对象都会保留自己的类型信息,在调用时进行动态绑定。Go的interface有着类似的用法,也有着类似的优缺点。

C++的模板也是一种duck typing:

1
2
3
4
5
6
7
8
9
10
template <typename C>
int CountByColor(const C& container, Color color) {
int count = 0;
for (const auto& item: container) {
if (item.Color() == color) {
++count;
}
}
return count;
}

这里面有个模板参数C,我们对它的要求是:

  1. 可遍历,具体来说是支持begin(container)end(container)两种API。
  2. 遍历出来的每个元素有T Color() const方法,且TColor类型有合适的operator==函数存在。

所有满足这个条件的C都可以作为CountByColor的参数类型。

当然C++的模板与Python的duck typing还是有很大区别的,因为它并没有真的擦除掉元素类型:CCountByColor原型的一部分。这样我们其实一直都保留着元素的具体类型信息,好处:

  1. 完整的类型安全性,没有任何环节丢掉了类型信息。
  2. 因此不需要动态绑定,所有环节都是静态的,没有运行时性能损失。

但也有坏处:

  1. 模板类型会作为模板函数或模板类的原型的一部分,即vector<int>vector<double>是两个类型,没办法用一个类型来表示,也就没办法实现出上面Python例子中的map_by_color函数。
  2. 每次用不同的参数类型来实例化模板时,都会新生成一份代码,导致编译出来的二进制文件很大。

C++中结合继承与Template的Type Erasure

在C++中我们可以结合继承与Template,实现出一种Type Erasure,它既有duck typing的优点,又可以将不同类型用同一种类型表示。

假设我们现在要重新设计上面的Counter接口,首先我们定义一个内部的基类,Counter的每个方法都对应它的一个虚函数:

1
2
3
4
5
6
7
class CounterBase {
public:
virtual ~CounterBase {}
virtual void Increase(int v) = 0;
virtual void Decrease(int v) = 0;
virtual int Count() const = 0;
}

接下来我们使用模板实现一个通用的子类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename T>
class CounterImpl: public CounterBase {
public:
explicit CounterImpl(T t): mImpl(std::move(t)) {}
void Increase(int v) override {
mImpl.Increase(v);
}
void Decrease(int v) override {
mImpl.Decrease(v);
}
int Count() const override {
return mImpl.Count();
}
private:
T mImpl;
};

最后我们还要定义一个Counter类型,但它不需要有任何的虚函数,也不需要作为任何类型的基类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Counter {
public:
template <typename T>
Counter(T t): mPtr(new CounterImpl(std::forward<T>(t))) {}
void Increase(int v) {
mPtr->Increase(v);
}
void Decrease(int v) {
mPtr->Decrease(v);
}
int Count() const {
return mPtr->Count();
}
private:
std::unique_ptr<CounterBase> mPtr;
};

然后我们就可以使用Counter来表示所有满足条件的类型了:

1
2
3
Counter c1(ClassA{});
Counter c2(ClassB{5});
Counter c3 = ClassC{3, 6};

对于没有IncreaseDecreaseCount接口的类型,比如内置类型int,我们还可以特化模板来满足要求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <>
class CounterImpl<int>: public CounterBase {
explicit CounterImpl(int v): mValue(v) {}
void Increase(int v) override {
mValue += v;
}
void Decrease(int v) override {
mValue -= v;
}
int Count() const override {
return mValue;
}
private:
int mValue;
};

然后我们就可以写:

1
Counter c = 5;

是不是很赞?

C++中Type Erasure的例子

std::shared_ptr

我们知道std::shared_ptr的Deleter不是std::shared_ptr类型的一部分(参见为什么unique_ptr的Deleter是模板类型参数,而shared_ptr的Deleter不是),这给使用者带来了很多好处(相比std::unique_ptr):

  • 对于std::shared_ptr<T>,使用者不需要知道T的完整类型(当然创建者需要)。
  • 两个std::shared_ptr<T>对象类型相同,可以相互赋值,即使它们的Deleter类型不同。
  • 销毁T使用的Deleter永远是来自创建std::shared_ptr的编译单元,跨DLL和so使用时不会有销毁问题。

它的秘诀就是Type Erasure。参考clang的实现,std::shared_ptr只有两个成员变量:

1
2
3
4
5
6
template <typename _Tp>
class shared_ptr {
private:
_Tp* __ptr__;
__shared_weak_count* __cntrl__;
};

其中__shared_weak_count的定义为:

1
class __shared_weak_count;

可以看到不包含Deleter的类型。实际上构造的类型是它的子类__shared_ptr_pointer

1
2
template <class _Tp, class _Dp, class _Alloc>
class __shared_ptr_pointer: public __shared_weak_count;

具体的实现略。可以看到这里就使用了我们上面提到的继承与Template结合的方法。

std::function

std::function中使用了一个基类__base

1
2
3
4
5
6
template<class _Rp, class ..._ArgTypes>
class __base<_Rp(_ArgTypes...)> {
public:
...
virtual _Rp operator()(_ArgTypes&& ...) = 0;
};

同样地,具体的子类通过模板来保留类型信息,而通过基类来实现统一的存储与调用。

boost::any

boost::any是非常典型的应用了Type Erasure方法的类型。它允许你用一种类型来保存任何类型的对象,且通过type_info方法返回具体的对象类型。这样我们可以使用一个boost::any的容器保存任意类型的对象。它的实现很短,只有313行,很值得看一下。

std::any

std::any是C++17引入的新类型,与boost::any的接口几乎完全相同,区别在于,它使用了SBO(Small Buffer Optimization)方法,可以把小对象直接构造在类型内部,性能更好。

基于Type Erasure实现Unified Call Syntax

假设我们想实现一个接口类型Fooable,它有一个方法foo,使得Fooable::foofoo(Fooable)都可以用来表示T::foofoo(T)两种调用方式,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct Member {
void foo();
};

struct NonMember{};
void foo(const NonMember&);

Fooable member_erased {Member{}};
foo(member_erased);
member_erased.foo();

Fooable non_member_erased {non_member{}};
foo(non_member_erased);
non_member_erased.foo();

第一步我们先定义基类:

1
2
3
class Storage {
virtual void call() = 0;
};

第二步定义模板子类:

1
2
3
4
5
6
7
8
9
10
template <typename T, bool HasMemberFoo = has_member_foo<T>::value>
struct StorageImpl: Storage {
T m_t;

StorageImpl (T t): m_t {std::move(t)} {}

void call() override {
m_t.foo();
}
};

其中has_member_foo是用来判断T::foo是否存在的辅助类型:

1
2
3
4
5
template <typename T, typename = void>
struct has_member_foo: std::false_type{};

template <typename T>
struct has_member_foo<T, std::void_t<decltype(std::declval<T>().foo())>> : std::true_type{};

T::foo()是个合法的表达式时,has_member_foo<T>::value就是true,否则就是false

然后我们为false准备一个特化版本:

1
2
3
4
5
6
7
8
9
10
template <typename T>
struct StorageImpl<T, false> : Storage {
T m_t;

StorageImpl (T t) : m_t {std::move(t)} {}

void call() override {
foo(m_t);
}
};

最后实现Fooable类型:

1
2
3
4
5
6
7
8
9
10
11
12
class Fooable {
public:
template <typename T>
Fooable (T t)
: m_storage {std::make_unique<StorageImpl<T>>(std::move(t))}
{}
void foo() { m_storage->call(); }
private:
std::unique_ptr<Storage> m_storage;
};

void foo(Fooable& f) { f->foo(); }

什么时候使用Type Erasure

简单来说,如果你有下面两个需求,你可能是需要Type Erasure的:

  • 你需要用同一种方式处理不同的类型。
  • 你需要用同一种类型或容器保存不同类型的对象。

然而在很多情况下,你可能只需要用std::shared_ptrstd::function就能达到这个目的,这个时候就不需要自己实现Type Erasure了。

相关文档

本章介绍一种通用技术(传值调用)和一种通用特性(原地构造),它们都受到多种因素的影响,作者能给的建议只是“考虑用一下”,实践上要根据具体情况来定。

Item41:对于可复制的、移动非常廉价、总是复制的参数,考虑调用时传值

有些函数参数就是要被复制的:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Widget {
public:
void addName(const std::string& newName) {
names.push_back(newName);
}

void addName(std::string&& newName) {
names.push_back(std::move(newName));
}
...
private:
std::vector<std::string> names;
};

上面的两个函数一个处理左值,一个处理右值,但实际上它们的逻辑都是一样的,但我们写了两个函数,两个实现。

假如你想用普适引用来代替上面两个函数:

1
2
3
4
5
6
7
8
class Widget {
public:
template <typename T>
void addName(T&& newName) {
names.push_back(std::forward<T>(newName));
}
...
};

代码省掉了一份,但又导致了其它问题。作为模板函数,addName需要放到头文件里。而且它不一定只有两个实例化版本(左值和右值),所有可以用于构造std::string的类型都可能会实例化一个版本(参见Item25)。同时,还有一些参数类型没办法使用普适引用(参见Item30)。如果调用方传递错类型,编译错误信息会非常恐怖(参见Item27)。

一种方法可以让我们只写一个函数,且没有普适引用的各种问题:参数直接传值,不使用引用:

1
2
3
4
5
6
class Widget {
public:
void addname(std::string newName) {
names.push_back(std::move(newName));
}
};

这个版本中,我们知道:

  1. newName与实参没有关系,因此如何修改newName都不会影响到实参。
  2. 这是newName最后一次被使用,因此移动它不会影响到后面程序的运行。

我们只需要写一个函数,因此:

  1. 避免了代码重复,包括源代码和目标代码。
  2. 没用普适引用,因此不会污染头文件,不会导致奇怪的运行失败或编译错误。

但它的开销如何?

当实参是左值时,实参到形参newName会有一次复制。但当实参是右值时,newName的构造会使用移动构造函数,因此它的构造开销是一次移动。后面构造names中的元素时,无论实参是左值还是右值,都是一次移动。

因此上面的方法中,当实参是左值时,开销是一次复制+一次移动;当实参是右值时,开销是两次移动。对比原来的重载版本,当实参是左值时,开销是一次复制;当实参是右值时,开销是一次移动。因此传值方法会比重载方法多一次移动的开销。

对于普适引用版本,情况有点复杂。当T是可以用于构造std::string时,普适引用在实参到形参中不会有对象构造,而是直接使用实参去构造names中的元素。本节我们不考虑这种情况,只假设实参是std::string,那么普适引用版本的开销与重载版本相同。

回头看一下标题,“对于可复制的、移动非常廉价、总是复制的参数,考虑调用时传值”:

  1. 你只能是考虑要不要用传值方法。它确实有很多优点,但它也确实比其它版本多一次移动的开销。一些场景下(后面会讨论),这次开销不可忽视。

  2. 只能对可复制的参数使用传值方法。对于只能移动的类型,我们只能移动构造形参,就不存在需要写两个重载版本的问题,也就不需要使用传值方法了:直接传右值引用多简单。

  3. 传值方法只适用于“移动非常廉价”的类型。

  4. 只有当参数的复制不可避免时,才需要考虑传值方法。假如有某个分支下我们不需要复制参数,那么重载版本就不需要复制参数,而传值版本在调用那一刻已经复制完了,没办法省掉。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    class Widget {
    public:
    void addName(std::string newName) {
    if ((newName.length() >= minLen) && (newName.length() <= maxLen)) {
    names.push_back(std::move(newName));
    }
    }
    ...
    };

即使上面三个条件都满足,也有场景不适用于传值方法。我们说复制时,不光是复制构造,还有复制赋值。考虑到这点,开销分析就更复杂了。

1
2
3
4
5
6
7
8
9
class Password {
public:
explicit Password(std::string pwd)
: text(std::move(pwd)) {}

void changeTo(std::string newPwd) {
text = std::move(newPwd);
}
};

构造Password显然是可以用传值方法的:

1
2
std::string initPwd("Supercalifragilisticexpialidocious");
Password p(initPwd);

但在调用changeTo时:

1
2
std::string newPassword = "Beware the Jabberwock";
p.changeTo(newPassword);

newPassword是左值,因此newPwd要进行复制构造,这里会分配内存。之后newPwd移动赋值给text时,text会释放自己原有的内存块,转而使用newPwd持有的内存块。因此changeTo有两次内存操作,一次分配,一次释放。

但我们这个例子中,旧密码比新密码长,因此如果我们使用重载方法,就不会有内存分配或释放(直接复制到旧密码的内存块上):

1
2
3
void Password::ChangeTo(const std::string& newPwd) {
text = newPwd;
}

因此在这个例子中,传值版本比重载版本的开销多了两次内存操作,很可能比字符串的移动开销大一个数量级。

但在旧密码比新密码短的例子中,重载版本也没办法避免掉两次内存操作,这时传值方法的优势又回来了。

以上分析只针对实参为左值的情况,当实参为右值时,移动总是更好的。

由此可以看出,当有赋值时,可能影响结论的因素太多了,比如Password这个例子中std::string是否使用了SSO优化也会影响我们的结论。

实践中通常采用“有罪推定”原则,即优先使用重载方法或普适引用方法,直到传值方法显示出它的优势。对于那些对性能有极致要求的软件,传值方法就不太合适了。首先,多出的一次移动的开销可能很重要;其次,很难确定到底多了几次移动。假设我们构造链路上有N层,有可能每层的构造都使用了传值方法,看起来简单的一次构造实际上多了N次移动的开销。而且我们还很难发现这件事。

传值方法的另一个问题,是当有继承的时候,传值可能引发“切片问题”。当形参是基类而实参是派生类型时,实参到形参的构造会丢掉派生类型多出的部分,最终只得到一个基类对象。而传引用就不会有这个问题。这也是C++98中传值不被接受的一个原因。

Item42: 考虑用原地构造替代插入

假设我们有一个容器,元素类型是std::string。当我们向这个容器插入一个新元素时,新元素的类型是什么?直觉告诉我们,新元素的类型就是std::string

但直觉不总是对的。看下面的代码:

1
2
std::vector<std::string> vs;
vs.push_back("xyzzy");

这里,我们插入的新元素类型不是std::string,而是char[6]char*std::vector有两个版本的push_back

1
2
3
4
5
6
7
8
template <typename T, class Allocator = allocator<T>>
class vector {
public:
...
void push_back(const T& x);
void push_back(T&& x);
...
};

当编译器发现实参类型与形参类型不匹配时,它会生成一些代码,构造一个临时的std::string对象,效果类似于:

1
vs.push_back(std::string("xyzzy"));

整个过程为:

  1. 构造临时对象。
  2. std::vector分配空间给新元素。
  3. 将临时对象复制到新的空间上。
  4. 析构临时对象。

于是这里偷偷的多了一次对象的构造和析构。另外还有一次临时对象的复制。当我们很关心性能时,这些额外开销是不可忽视的。

C++11新增的emplace_back方法就可以避免这个问题:

1
2
vs.emplace_back("xyzzy");
vs.emplace_back(50, 'x');

它会先分配空间,再在新空间上使用传入参数直接构造出std::string。每个支持push_back的容器也都支持emplace_back,支持push_front的容器也都支持emplace_front,支持insert的容器也都支持emplace

一般来说insert和emplace的效果是完全相同的,同时emplace还省掉了临时对象的构造和析构,那么还有什么情况下我们不用emplace呢?

目前的C++标准库实现中,既有emplace比insert快的场景,也有emplace比insert慢的场景。这些场景很难列举,取决于传入的参数类型、使用的容器、新元素所处的位置、元素的构造函数的异常安全性,以及对于map和set类容器,要插入的元素是否已经存在等因素。因此在决定使用insert还是emplace前,先测一下性能。

当然也有些启发式的方法来判断emplace适用于哪些场景。以下条件如果为真,emplace就很可能比insert性能更好:

  • 新元素在容器内直接构造,而不是先构造再赋值。

    在前面的例子中,我们要在vs的尾部新增一个元素,显然这里之前不存在对象,我们只能构造一个对象。emplace此时就比较有优势。但下面这个例子中:

    1
    2
    3
    std::vector<std::string> vs;
    ...
    vs.emplace(vs.begin(), "xyzzy");

    我们要在vs的头部新增一个对象。大多数实现会先用""xyzzy"构造出一个临时的std::string,再移动赋值给目标对象。这样emplace相比insert的优势就没有了。

    当然,这取决于我们用的实现。但此时启发式方法还是有用的。理论上基于节点的容器都会构造新元素,而大多数STL容器都是基于节点的。只有几个容器不基于节点:std::vectorstd::dequestd::stringstd::array基于节点,但它没有emplace和insert类的方法)。当你明确知道新元素会被构造出来时,就可以毫不犹豫的使用emplace。这三个容器的emplace_back都是推荐用的,对于std::deque来说,emplace_front也推荐使用。

  • 实参类型与容器的元素类型不同。(解释略)

  • 容器不会因重复元素而拒绝插入。这里说的是对于std::setstd::map这样的容器,在插入时需要比较,那么就需要把实参先构造为一个临时对象。这样emplace的优势就没有了。

下面两次调用就满足上面的条件:

1
2
vs.emplace_back("xyzzy");
vs.emplace_back(50, 'x');

在决定使用emplace后,有两个问题值得考虑。第一个是资源管理的问题。假设你有一个容器:

1
std::list<std::shared_ptr<Widget>> ptrs;

Widget需要的自定义销毁函数是:

1
void killWidget(Widget* pWidget);

根据Item21,这种情况下我们没办法用std::make_shared了。insert版本是:

1
ptrs.push_back(std::shared_ptr<Widget>(new Widget, killWidget));

或:

1
ptrs.push_back({new Widget, killWidget});

无论哪种情况,都要构造出一个临时对象。这不就是emplace能避免的吗?

1
ptrs.emplace_back(new Widget, killWidget);

但注意,临时对象带来的好处远比它的构造和析构成本要大得多。考虑一种情况:

  1. 我们构造了一个临时对象temp,持有new Widget的结果。
  2. 容器扩张时抛了个异常。
  3. 异常传播到外层,temp被销毁,Widget*被释放。

而emplace版本则是:

  1. new Widget的结果,一个裸指针,传进了emplace_back函数内。
  2. 容器扩张时抛了个异常。
  3. 没有智能指针持有前面的裸指针,内存泄漏。

类似的问题也会出现在每个RAII类中。将裸指针(或其它未受保护的资源)通过完美转发的方式传递进emplace函数后,在RAII对象构造之前,有个窗口期。正确的方式是:

1
2
std::shared_ptr<Widget> spw(new Widget, killWidget); 
ptrs.push_back(std::move(spw));

或:

1
2
std::shared_ptr<Widget> spw(new Widget, killWidget);
ptrs.emplace_back(std::move(spw));

无论哪种方式都要先构造对象,此时emplace和insert就没什么区别了。

第二个问题是emplace与explicit构造函数的相互作用。想象你有一个正则表达式的容器:

1
std::vector<std::regex> regexes;

有一天你写了这么一行代码:

1
regexes.emplace_back(nullptr);

然后编译器居然不报错!nullptr怎么可能是正则表达式呢?

std::regex r = nullptr是没办法编译通过的。而regexes.push_back(nullptr)也是非法的。

问题在于std::regex有一个接受const char*的析构函数:

1
std::regex upperCaseWord("[A-Z]+");

但它是explicit的,因此下面这么用会报错:

1
2
std::regex r = nullptr;
regexes.push_back(nullptr);

但显式调用构造函数是可以的:

1
std::regex r(nullptr);

不幸的是emplace函数中就是这么构造对象的,能编译,但运行结果未定义。

下面两种很类似的构造方式,但结果不同:

1
2
std::regex r1 = nullptr;          // Error
std::regex r2(nullptr); // OK

第一种是复制初始化,第二种是直接初始化。复制初始化不允许使用explicit构造函数,而直接初始化则可以。emplace函数中执行的是对象的直接初始化,而insert函数中则是复制初始化。

因此当你使用emplace的时候,注意看一下你传递的类型对不对,因为它会在你没注意到的时候绕开explicit的限制,然后制造一个大新闻。

目录

Item38: 知道线程句柄析构时的各种行为

上节介绍了可join的std::thread对应一个可运行的底层线程,而未推迟的future也可能对应一个OS线程。这里把它们都称为OS线程的句柄。

很有趣的是,std::thread和future在析构的行为上非常不同。析构一个可join的std::thread会导致程序终止,而析构一个future有时像是做了隐式的join,有时像是做了隐式的detach,有时两者都不是。总之它不会导致程序终止。

我们首先从这样一个发现开始:future就是执行者将结果返回给调用者的一个管道。执行者(通常是异步执行)将计算结果写到管道中(例如std::promise对象),调用者再通过future拿到结果。

1
2
3
----------                                          ----------
| Caller | <--- future ------------ std::promise ---| Callee |
---------- ----------

但结果保存在哪了?执行者可能在调用者调用get之前就结束了,所以结果一定不会在执行者对应的std::promise对象中,它会在执行者结束时析构掉。

结果也不可能在future对象中,因为std::future有可能用来创建std::shared_future,然后被复制很多遍。如果结果在future中,那么结果也会被复制很多遍。我们知道有些结果类型是不能复制的,因此不可能在future对象中。

那么答案就是结果保存在std::promise和future之外,且需要是可共享状态。C++标准中没有规定结果的类型,编译器可以自行实现。

1
2
3
----------                ------------------                    ----------
| Caller | <--- future ---| Result(Shared) |--- std::promise ---| Callee |
---------- ------------------ ----------

与future关联的这个共享状态就决定了future的析构行为,尤其:

  • 对于通过std::async启动的未推迟的task,最后一个与之关联的future在析构时会阻塞,直到这个task完成。本质上,这个析构就是对task所在的线程调用了一次join。
  • 其它future的析构都只是简单的析构这个对象。这些析构就是对底层线程调用了detach。对于被推迟的task,当它关联的最后一个future析构后,这个task就永远不会被执行了。

简单来说就是有一种正常行为和一个例外。正常行为就是future的析构只析构future对象,它既不会join也不会detach。而当以下条件都满足时,应用例外规则:

  • future关联着由std::async创建的共享状态。
  • task的启动策略是std::launch::async,包括调用std::async时显式指定该策略,也包括调用者使用了默认策略,而系统选择了该策略。
  • 它是最后一个关联共享状态的future。

以上条件都满足时,future的析构会对底层线程调用join。

为什么对于由std::async启动的未推迟的task会有这个例外?就我(Scott Meyers)所知,C++标准委员会想避免隐式detach引起的问题(见Item37),但又不想像对待可join的std::thread那样使用“程序终止”这么激进的策略,所以最终他们妥协了,决定隐式使用join。这个决定并不是毫无争议,一直有声音想在C++14中将这个行为废弃掉,但最终它还是保留了下来。

future的API上没办法知道它是不是关联一个共享状态,因此没办法知道随便一个future的析构会不会阻塞。这导致了一些有趣的潜在状况:

1
2
3
4
5
6
7
std::vector<std::future<void>> futs; // 析构时可能阻塞
class Widget { // 析构时可能阻塞
public:
...
private:
std::shared_future<double> fut;
};

当然,如果你知道某个future肯定不满足例外条件,你就能确定它的析构不会阻塞。例如,当我们使用std::packaged_task时,它返回的future就不与std::async创建的共享状态相关联,因此我们可以确定这样的future的析构是不会阻塞的。

std::packaged_taskstd::function类似,都是对某个callable的对象的包装。区别在于std::packaged_task会返回一个future。我们可以用std::packaged_task创建一个std::thread来运行callable对象,结果通过future得到。

当然std::packaged_task也可以通过std::async来运行,但这样就没有理由用std::packaged_task了,直接用std::async更方便。

1
2
3
4
5
6
{
std::packaged_task<int()> pt(calcValue);
auto fut = pt.get_future();
std::thread t(std::move(pt));
... // see below
}

“…”中对t可能的三种操作:

  • 没有对t作任何操作。这样结束时t还是可join的,导致程序终止。
  • t调用了join。这样fut析构时就不需要阻塞了。
  • t调用了detach。这样fut析构时也不需要调用detach了。

结论就是,对于由std::packaged_task得到的future,你不需要怎么关心它的析构行为。

Item39: 考虑用一个void future来进行只运行一次的事件通信

当需要进行事件通信时,一种显然的方式就是通过条件变量:

1
2
std::condition_variable cv;
std::mutex m;

通知方的代码很简单:

1
2
...
cv.notify_one();

接收方的代码就有点复杂了:

1
2
3
4
5
6
...
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk);
}
...

上面的代码并不正确。首先,这种方法的味道(code smell)不好,就表示可能藏bug。这里的第一个问题在于,是否有必要使用std::mutexstd::mutex是用来保护共享状态的,但很有可能通知方和接收方并没有共享什么东西,那么这个std::mutex就只是为了条件变量才构造的,逻辑上并不需要它。

即使用到了std::mutex,仍然有两个问题:

  • 条件变量不能保存通知状态,因此如果通知方在接收方开始wait前就调用了notify_one,接收方就会hang在那。
  • 条件变量的wait有可能在条件并不满足(未通知)时结束(假醒),因此需要有办法知道我们等待的条件是否真的满足了。接收方自己当然没办法知道这个事情(否则它就不需要条件变量了)。

以上两个问题的一种解法是使用一个共享状态,而不是条件变量:

1
2
3
4
5
6
7
8
9
std::atmoic<bool> flag(false);
// 通知方
...
flag = true;
// 接收方
...
while (!flag) {
...
}

这种方法的好处是不需要mutex,能保存通知状态,不需要处理假醒。但它的问题在于接收方在阻塞时需要不停的查询状态,CPU开销很大。

可以把共享状态与条件变量结合起来使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::condition_variable cv;
std::mutex m;
bool flag(false); // not std::atomic
// 通知方
...
{
std::lock_guard<std::mutex> g(m);
flag = true;
}
cv.notify_one();
// 接收方
...
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] { return flag; }); // use lambda to avoid spurious wakeups
...
}
...

这种方法避免了我们讲到的几个问题,但它的设计不好:很多共享变量;两套机制来做同一件事;逻辑暴露在外,容易写错。简单来说就是这种方法不够干净。

一种更干净的方法是接收方等待一个future,而通知方通过给这个future赋值来进行通知。Item38提到了std::promise代表了一个通信通道的发送端,而future则代表了接收端。这样的通道可以用于任何需要通信的场合。

方案很简单,通知方要持有一个std::promise对象,而接收方持有对应的future。通知方通过调用std::promiseset_value来写入一条消息,接收方通过future的wait来等待消息。

无论是std::promisestd::future还是std::shared_future都是模板类型,需要一个类型参数,也就是消息的类型。但我们只关心通知本身,不需要消息有类型,最合适的就是void

1
2
3
4
5
6
7
std::promise<void> p;
// 通知方
...
p.set_value();
// 接收方
...
p.get_future().wait();

没有mutex,没有条件变量,没有共享状态,没有假醒,是不是很完美?不完全是。Item38提到std::promise背后实际上有一个共享状态,就意味着它的构造包含着一次内存分配和释放的开销。

而且,std::promise只能赋值一次。重复对std::promise的赋值是没有意义的,接收方感知不到。因此这条通信通道只能使用一次,这是与前述方案最大的区别。条件变量和共享状态总是可以重复利用的。

“只有一次”的通信有时候也是很有用的。举个例子,有时候我们想创建一个被暂停的线程,需要等待一个事件后才开始工作,就可以用std::promise来实现:

1
2
3
4
5
6
7
8
9
10
11
12
std::promise<void> p;

void react();
void detect() {
std::thread t([] {
p.get_future().wait();
react();
});
...
p.set_value();
...
}

这里我们创建了一个暂停的线程,它会等待p被赋值后才执行react。需要一个暂停线程的地方很多,比如我们想避免线程创建的成本,比如想在线程真正工作前设置一下优先级什么的(std::thread没有这种接口,但我们可以通过native_handle获得OS线程的句柄来做这样的事情)。

Item37中提到用ThreadRAII类来代替直接的std::thread会更好,我们改写一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::promise<void> p;

void react();
void detect() {
ThreadRAII tr(
std::thread([] {
p.get_future().wait();
react();
}),
ThreadRAII::DtorAction::join
);
... // 注意这里
p.set_value();
...
}

上面的代码有个问题:第一个“…”处如果抛了异常,p还没有赋值,因此tr还在阻塞中,因此它是可join的,因此tr析构会导致程序终止。

这个问题有很多解法,这里就不赘述了。最后说一下如何把上面的通信过程由一对一改成一对多,即一个通知方唤醒多个接收方:把std::future换成std::shared_future即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::promise<void> p;

void react();
void detect() {
auto sf = p.get_future().share(); // std::shard_future
std::vector<std::thread> vt;
for (int i = 0; i < threadForRun; ++i) {
vt.emplace_back(
std::thread([sf] { // 必须值捕获
sf.wait();
react();
})
);
}
...
p.set_value();
...
for (auto& t: vt) {
t.join();
}
}

注意各个线程持有的std::shared_future必须是值,不能是引用。

Item40: 使用std::atomic应对并发,而用volatile访问特殊内存

本节内容不重复了,大家都比较熟悉。简单列一下结论:

  1. std::atomic是真正的原子操作,用于并发,但不能用于访问特殊内存(如硬件资源)。
  2. volatile在某些语言中可以用于并发,但在C++中不能用于并发,它不保证原子的读写,也不保证指令的先后顺序。它的用途是访问上面说的特殊内存。
  3. 可以结合起来,用于需要并发访问的特殊内存:volatile std::atomic<int> vai
  4. 访问std::atomic要比访问普通变量慢得多,它的内存屏障也会限制编译器的指令重排等优化,因此不要滥用std::atomic

目录

全局变量

很多人都知道代码中要尽量避免使用全局变量,那么全局变量有什么问题,为什么大家觉得它不好?因为全局变量是一种全局状态,而可变的全局状态破坏了理想的程序。

阅读全文 »

The Interface Principle in C++

原文地址

背景

C++的接口原则对于写出既有表现力,又保证了封装性的代码是非常重要的,当前即有语言特性与之相关,且未来还可能有更多特性来增强这一原则,值得我们注意。

本文用到的名词:

  • 方法:类的成员函数。
  • 函数:非成员函数。

非成员(非友元)函数

Effective C++的Item23中,Scott Meyers鼓励我们将只需要类的公开方法就能实现的函数移到类外面。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
class Circle
{
public:
explicit Circle(double radius) : m_radius(radius) {}

double getRadius() const {return m_radius;}
double getPerimeter() const {return 2 * Pi * m_radius;}
double getArea() const {return Pi * m_radius * m_radius;}

private:
double m_radius;
};

注意下面两个方法,都只用到了Circle的其它公开方法:

1
2
double getPerimeter() const {return 2 * Pi * getRadius();}
double getArea() const {return Pi * getRadius() * getRadius();}

那么把它们移到Circle外面作为非成员函数,就遵守了Meyers原则,增强了Circle类的封装性:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Circle
{
public:
explicit Circle(double radius) : m_radius(radius) {}

double getRadius() const {return m_radius;}

private:
double m_radius;
};

double getPerimeter(Circle const& circle) {return 2 * Pi * circle.getRadius();}
double getArea(Circle const& circle) {return Pi * circle.getRadius() * circle.getRadius();

另一方面,这样也减少了Circle本身的代码量,重构时涉及的代码更少,更稳定。

以下是应用这一原则的步骤:

  • 确认指定方法是否只依赖类的其它公开方法(或改起来也比较容易)。
  • 创建一个同名的非成员函数。
  • 将该类型作为函数的第一个参数
    • 如果原方法不是const方法,参数类型就是非const引用。
    • 如果原方法是const方法,参数类型就是const引用。
  • 将实现代码复制过来,并在每个调用类公开方法的地方加上参数的名字。

注意,要保证新函数与旧方法同名。有时候我们会不喜欢给一个非成员函数起名为getPerimeter,更愿意起名为getCirclePerimeter,这样会显得更具体一些。但这是错的:“Circle”已经出现在第一个参数的类型中了,不管是对人还是对编译器,都不需要在函数名上再加一个“Circle”了。getPerimeter(circle)看起来也要比getCirclePerimeter(circle)更自然。

接口原则

新的Circle类有点令人不安:它有在类外面的功能。这是我们在上一节有意做的,但通常来说类的功能不就是它的接口吗?

上面说对了一半,类的功能就应该是它的接口。但接口也不仅仅包括类的公开方法。这就是“接口原则”要说的。Herb Sutter在Exceptional C++的Item31-34中详细解释了这一原则(见相关文档1和2)。

满足以下条件的非成员函数也是类接口的一部分:

  • 它的某个参数是该类的对象。
  • 它与该类在相同的命名空间
  • 它与该类一同发布,即它们声明在相同的头文件

上节中的getPerimetergetArea就满足这些条件,因此它们也是Circle接口的一部分。换句话说,下面两种调用方式,差别只在于语法:

1
getPerimeter(circle);

VS

1
circle.getPerimeter();

根据接口原则,这两种表达方式都是在调用Circle类的getPerimeter功能。

ADL(参数依赖查找):接口原则与命名空间配合良好

当引入命名空间之后,接口原则可能会有问题:调用函数时要加命名空间,而方法则不用。也就是函数与方法开始有不一致了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
namespace geometry
{

class Circle
{
public:
explicit Circle(double radius) : m_radius(radius) {}

double getRadius() const {return m_radius;}

private:
double m_radius;
};

double getPerimeter(Circle const& circle) {return 2 * Pi * circle.getRadius();}
double getArea(Circle const& circle) {return Pi * m_radius * circle.getRadius();}

} // end of namespace geometry

现在函数的调用方式:

1
geometry::getArea(circle);

而如果是方法的话,调用方式:

1
circle.getArea();

这两者的不一致对接口原则而言是一种挑战,因为接口原则需要函数与方法间只有语法上的区别,不应该有其它信息上的区别。

幸好C++有参数依赖查找(ADL),又称Koenig查找:将参数类型所在的命名空间的所有函数声明带到当前作用域,来解决名字查找的问题。上例中,在查找getArea时,circle触发了ADL,从而geometry的所有声明被带到当前作用域,其中也包括了getArea,因此不加命名空间编译器仍然能找到这个函数:

1
getArea(circle);

泛型代码

泛型代码中,非成员函数能发挥更大的作用。

前文中我们说过不建议在函数名字中嵌入参数类型的名字。事实上名字起的通用一些还有助于将其用于泛型代码。假设你还有一个Rectangle类也能计算周长,因此你会为其实现一个getPerimeter

1
double getPerimeter(Rectangle const& rectangle);

之后我们就可以把getPerimeter用到泛型代码中了:

1
2
3
4
5
6
template <typename Shape>
void operateOnShape(Shape const& shape)
{
double perimeter = getPerimeter(shape);
....
}

而且,有些类型不是类(比如内置类型)或你没办法给它加方法(比如三方库代码),这时候想为其增加一个泛型代码中可以用的功能,唯一可行的方法就是通过非成员函数。例如C++11增加的std::beginstd::end,设计为非成员函数的一大因素就是为了处理内置数组类型。

(实际上这就是另一种实现OO和多态的思路:Traits,很多人觉得它是比继承更好的OO方案,比如Rust就只有Traits没有继承。C++中用Traits还能减少类型间的耦合。)

C++的统一函数调用语法?

C++已经有一些语言特性在支持接口原则了,ADL就是其中最显眼的一个。未来还可能会有更多语言特性与接口原则相关。

std::invoke(C++17)允许你用一套语法同时处理方法和函数:

1
std::invoke(f, x, x1, ..., xn);
  • 如果f是方法,则调用x.f(x1, ..., xn)
  • 如果f是函数,则调用f(x, x1, ..., xn)

已经有提案(见相关文档3)建议语言中直接支持以下两种语法的等价性:

1
f(x, x1, ..., xn);

如果fx的一个方法,则等价于x.f(x1, ..., xn)。而:

1
x.f(x1, ..., xn);

如果f是函数,则等价于f(x, x1, ..., xn)

相关文档

What’s In a Class

本节主要介绍下这篇文章没有被前面内容覆盖到的东西。

C风格的OO

接口原则实际上起源于C风格的OO。例子:

1
2
3
4
5
6
7
struct _iobuf { /*...data goes here...*/ };
typedef struct _iobuf FILE;

FILE* fopen(const char* filename, const char* mode);
int fclose(FILE* stream);
int fseek (FILE* stream, long offset, int origin);
long ftell (FILE* stream);

这里FILE就是一个类型,而fopenfclosefseekftell是它的公开方法,也就是它的接口。它的C++形式为:

1
2
3
4
5
6
7
8
class FILE {
public:
FILE(const char* filename, const char* mode);
~FILE();
int seek(long offset, int origin);
long tell();
...
};

从接口的角度,这两种形式没有什么区别。

类依赖什么

假设我们为类X实现operator<<,有两种方式:

1
2
3
4
5
6
7
class X {
...
};
ostream& operator<<(ostream& o, const X& x) {
...
return o;
}

以及

1
2
3
4
5
6
7
8
9
10
11
class X {
public:
virtual ostream& print(ostream& o) {
...
return o;
}
...
};
ostream& operator<<(ostream& o, const X& x) {
return x.print();
}

传统上我们会认为第一种方式更好,因为X没有依赖ostream。但实际上对吗?

  1. 根据接口原则,operator<<参数中有X,且和X一同被引入,它就是X的一部分。
  2. operator<<参数中有ostream,因此operator<<依赖于ostream
  3. 因此X也依赖于ostream

所以第一种方式根本没有减少X的依赖。

一些有意思的结果

如果AB是类,而f(A, B)是一个非成员函数:

  • 如果Af在一起,那么f就是A的一部分,那么A就依赖B
  • 如果Bf在一起,那么B就依赖A
  • 如果三个都在一起,那么AB就相互依赖。

下面更有意思。假设有类AB,且A有方法A::g(B),那么有:

  • 显然A依赖B
  • 假设AB在一起,A::g(B)的参数中有B,且和B在一起,那么A::g(B)也是B的一部分。显然A::g(B)的参数中也有A,即B的一部分依赖A,那么B也依赖A。因此AB是相互依赖的关系。

“PartOf”的关系到底有多强

接口原则一直在强调非成员函数也可以是类接口的“一部分”,那么这个关系到底有多强?

答案是比成员函数低一些。

标准库中有两个future模板类:std::futurestd::shared_future,它们的差别在很多场合中并不重要,因此下文中提到的future同时指这两个类型。

Item35: 优先基于task编程,而不是基于thread

如果你想异步执行函数doAsyncWork,有两种选择。

  1. 创建一个thread:

    1
    2
    int doAsyncWork();
    std::thread t(doAsyncWork);
  2. 或者创建一个task:

    1
    auto fut = std::async(doAsyncWork); // "fut" for "future"

task通常要比thread好,原因如下:

  1. 基于task的代码往往更少。
  2. 基于task更容易得到函数的返回值:调用future的get方法。
  3. future的get方法还能拿到函数抛出的异常,而thread中如果函数抛了异常,进程就挂掉了。

它们之间更本质的差别在于,基于task的方法有着更高的抽象层次,而无需关心底层的线程管理。下面是C++中”线程”的三种不同层次的概念:

  • 硬件线程:真正的运算线程,目前每个CPU核可以提供一个或多个线程。
  • 软件线程(OS线程):OS提供的线程,OS会负责管理和调度这些线程。通常OS线程可以远多于硬件线程。
  • std::thread:C++标准库提供的线程类,底层对应一个OS线程。有些情况下std::thread没有对应的OS线程:刚刚构造好;已经调用过join;已经调用过detach

OS线程数量是有上限的,超过上限时再创建会抛std::system_error,即使doAsyncWorknoexcept,调用std::thread t(doAsyncWork)也有可能抛这个异常。好的软件需要处理这样的异常,但怎么处理?

  • 方法一:只在当前线程调用doAsyncWork,但这样就不是并发了。
  • 方法二:等待其它线程结束,再创建新线程来执行doAsyncWork。但如果其它线程就在等待doAsyncWork被调用呢?

而且,即使OS线程数量没有到上限,创建过多的OS线程也会导致系统过载,大量资源消耗在线程调度和切换上。

避免系统过载不是一件容易的事情,我们很难确定OS线程与硬件线程间的合适比例,IO密集的线程与CPU密集的线程需要的比例差别很大。

但基于task来开发,把这些问题丢给task,就能简单一点。而std::async就是这么做的:

1
auto fut = std::async(doAsyncWork);

std::async就是把线程管理的难题交给了C++标准库,它会处理诸如out-of-threads的异常等问题。实际上std::async不一定会去创建线程,它允许调度器把这个函数安排在需要它结果(调用fut.wait()fut.get())的线程执行。

用了std::async后,负载均衡的问题仍然在,但现在需要处理它的不再是你了,而是调度器。调度器知道所有线程的情况,因此它处理负载均衡总会比人更好。

当然,std::async没办法解决前面GUI线程的问题,因为调度器不知道你的哪个线程对响应时间的要求最低。此时你可以指定std::launch::async来确保你的函数运行在另一个线程中。

最先进的线程调度器使用了一个系统级别的线程池来避免系统过载,它们通过work-stealing来平衡各硬件线程的负载。C++标准中没有规定要使用线程池或work-stealing,C++11的并发规格中也有一些内容令我们更难实现这样的线程调度器。但一些系统中已经包含了这些内容。当你使用这些系统时,如果你使用task,你就能轻松地用上这些技术,不用自己处理负载均衡、各种异常。

当然,还是有些场景中我们需要直接使用线程:

  • 需要访问底层线程实现的API。std::thread允许你通过native_handle方法获得底层线程的句柄,而std::future没有这样的方法。
  • 需要且能为你的应用优化线程使用,如在特定硬件平台上绕过某个已知的性能缺陷。
  • 需要在C++的并发API之上实现自己的线程技术,如为特定平台实现一个线程池。

Item36: 如果异步是必需的,就指定std::launch::async

std::async不保证你的函数一定是异步执行的,需要指定异步策略。有两个标准策略,都是std::launch中的枚举值。假设f是要通过std::async调用的函数:

  • std::launch::asyncf必须异步执行,比如在另一个线程。
  • std::launch::deferredf只在对应的future的getwait被调用时才执行,且是同步执行。如果没有人调用对应的getwaitf就不会被执行。

std::async的默认策略哪个都不是——是两个策略的or,即下面两个std::async的行为是完全一致的:

1
2
auto fut1 = std::async(f);
auto fut2 = std::async(std::launch::async | std::launch::deferred, f);

默认策略允许调度器自己选择是在另一个线程执行还是在当前线程执行;是立即执行还是等到getwait时执行。它有几个有趣的特性:

  • 无法预测f是否与当前线程并发执行,因为调度器有可能选择std::launch::deferred
  • 无法预测f是否在调用getwait的另一个线程执行。
  • 可能无法预测f是否会执行。

默认策略不太适合与TLS(Thread Local Storage)一起用,因为你不知道f到底在哪个线程执行,因此也就不知道f中访问的TLS变量到底是哪个线程的TLS变量。它也会导致wait循环超时,因为对task调用wait_forwait_until会导致它使用std::launch::deferred,进而导致下面这个看起来会结束的循环永远不结束:

1
2
3
4
5
6
7
8
9
10
using namespace std::literals;

void f() {
std::this_thread::sleep_for(1s);
}

auto fut = std::async(f);
while (fut.wait_for(100ms) != std::future_status::ready) {
...
}

如果f是在另一个线程执行的,上面的循环就没问题;但如果f是deferred,fut.wait_for就会一直返回std::future_status::deferred,导致循环永不结束。

这类bug很容易在开发和单元测试中被漏掉,因为调度器通常只在系统负载很高时采用deferred策略。

解决方案很简单:检查future是不是deferred,如果是,就不进循环。但我们没办法直接询问future是不是deferred,需要用wait_for来绕一下:

1
2
3
4
5
6
7
8
9
auto fut = std::async(f);
if (fut.wait_for(0s) == std::future_status::deferred) {
...
} else {
while (fut.wait_for(100ms) != std::future_status::ready) {
...
}
...
}

上述场景的要点在于,当满足以下条件时,使用std::async的默认策略才是好的:

  • task不需要与调用getwait的线程并发执行。
  • 无所谓访问哪个TLS变量。
  • 要么能确保有人会调用future的getwait,要么f执不执行都可以。
  • 调用了wait_forwait_until的代码要保证能处理deferred。

如果没办法保证以上几点,你需要确保你的task运行在另一个线程中,就指定std::launch::async

1
auto fut = std::async(std::launch::async, f);

我们可以自己包装一个函数,确保使用std::launch::async

1
2
3
4
template <typename F, typename... Ts>
inline std::future<typename std::result_of<F(Ts...)>::type> reallyAsync(F&& f, Ts&&... params) {
return std::async(std::launch::async, std::forward<F>(f), std::foward<Ts>(params)...);
}

C++14版本可以不用写那么复杂的返回类型:

1
2
3
4
template <typename F, typename... Ts>
inline auto reallyAsync(F&& f, Ts&&... params) {
return std::async(std::launch::async, std::forward<F>(f), std::foward<Ts>(params)...);
}

Item37: 令std::thread在所有路径下都不可join

每个std::thread对象都处于两种状态下:可join、不可join。可join的std::thread对应一个可运行或运行中的底层线程,例如被阻塞、未调度或已运行完成的线程都是可join的。

而其它状态的std::thread就是不可join的:

  • 默认构造状态的std::thread:不对应底层线程。
  • 被移动过的std::thread:底层线程现在由其它std::thread管理。
  • 已调用过joinstd::thread:底层线程已结束。
  • 已调用过detachstd::threaddetach会切断std::thread和底层线程的联系。

重点来了,如果std::thread的析构函数被调用时它是可join的,程序就会终止。下面给一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
constexpr auto tenMillion = 10'000'000;
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
std::vector<int> goodVals;
std::thread t([&filter, maxVal, &goodVals] {
for (auto i = 0; i <= maxVal; ++i) {
if (filter(i)) {
goodVals.push_back(i);
}
}
});
auto nh = t.native_handle(); // use t's native handle to set t's priority
...
if (conditionsAreSatisfied()) {
t.join();
performComputation(goodVals); // computation was performed
return true;
}
return false; // computation was not performed
}

这里我们直接构造std::thread而不用std::async的原因在于,我们需要拿到底层线程的句柄来设置优先级。

上面这段代码,如果最后走到了false分支,或中间抛了异常,就会遇到构造了一个可join的std::thread的问题,程序就会终止。可以改进的一点是在开始设置t为暂停状态,Item39会介绍如何做到这点。

回到这段代码中,为什么std::thread会有这个特性?原因在于另外两种处理方式会更糟:

  • 隐式的join。本例中,t的析构会等其中的计算全部做完,可能合理,但也可能造成难以debug的性能问题。
  • 隐式的detach。这样,std::thread对象和底层线程间的联系被切断,当t析构后,底层线程仍然在执行,可能会访问到已析构的goodVals,更加难以debug。

标准委员会认为析构一个可join的std::thread太可怕了,必须要禁止掉。因此你有责任确保所有情况下的std::thread都不可join。这可以通过包装一个RAII类来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ThreadRAII {
public:
enum class DtorAction {join, detach};

ThreadRAII(std::thread&& t, DtorAction a)
: action(a), t(std::move(t)) {}

~ThreadRAII() {
if (t.joinable()) {
if (action == DtorAction::join) {
t.join();
} else {
t.detach();
}
}
}

std::thread& get() {return t;}
private:
DtorAction action;
std::thread t;
};

几个值得注意的点:

  • 构造函数只接受std::thread的右值,因为std::thread只能移动不能复制。
  • 构造函数的参数顺序与成员顺序相反,因为参数里把重要的放前面,不重要的放后面更符合直觉;而成员顺序里依赖少的放前面,依赖多的放后面更合理。action不如t重要,因此参数里放后面;没有任何依赖,因此成员中放前面。
  • 提供一个get接口避免了为ThreadRAII实现一整套std::thread的接口。
  • ThreadRAII的析构函数中,在调用t.join()t.detach()前,需要先调用t.joinable(),因为有可能t已经被移动过了。这个析构函数中对t的访问是否有竞态?如果这里有竞态,即ThreadRAII析构时还有其它人在调用它底层的std::thread的成员函数,那么这种竞态不是ThreadRAII造成的,而是你的代码本身就有的。

应用ThreadRAII到我们前面的代码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
std::vector<int> goodVals;
ThreadRAII t(
std::thread([&filter, maxVal, &goodVals] {
for (auto i = 0; i <= maxVals; ++i) {
if (filter(i)) {
goodVals.push_back(i);
}
}
}),
ThreadRAII::DtorAction::join
);
auto nh = t.get().native_handle();
...
if (conditionsAreSatisfied()) {
t.get().join();
performComputation(goodVals);
return true;
}
return false;
}

当然ThreadRAII还是有可能阻塞的问题,也许这时候能打断这个线程会更好,但C++11没有提供这样的功能,这个主题也超出了本书的范围。

Item17解释了当一个类型定义了析构函数,编译器就不会自动为它生成移动函数了。如果你想让ThreadRAII可移动,就自己声明两个默认的移动函数。

目录