MPSCArrayQueue源码分析

技术

JCTools是服务虚拟机并发开发的工具,提供一些JDK没有的并发数据结构辅助开发。

聚合四种 SPSC/MPSC/SPMC/MPMC 数据变量的并发队列:

  • SPSC:单个生产者对单个消费者(无等待、有界和无界都有实现)
  • MPSC:多个生产者对单个消费者(无锁、有界和无界都有实现)
  • SPMC:单生产者对多个消费者(无锁 有界)
  • MPMC:多生产者对多个消费者(无锁、有界)

这里简要根据https://gitee.com/eric\_ds/baseutil中的MPSCArrayQueue版本源码进行分析来理解这个高性能的的无锁队列的实现原理。

MPSCArrayQueue源码


      1. `package com.jfireframework.baseutil.concurrent;`
2. 
3. `import java.lang.reflect.Array;`
4. `import java.util.Arrays;`
5. `import java.util.Collection;`
6. `import java.util.Iterator;`
7. `import java.util.NoSuchElementException;`
8. `import java.util.Queue;`
9. `import com.jfireframework.baseutil.reflect.UNSAFE;`
10. `abstract class PadFor64Bit`
11. `{`
12. `// 64长度的缓存行,要进行填充,需要8个byte。`
13. `long p1, p2, p3, p4, p5, p6, p7;`
14. 
15. `public static long noHuop(PadFor64Bit instance)`
16. `{`
17. `return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;`
18. `}`
19. `}`
20. 
21. `abstract class ProducerIndex extends PadFor64Bit`
22. `{`
23. `volatile long producerIndex;`
24. `private static final long OFFSET = UNSAFE.getFieldOffset("producerIndex", ProducerIndex.class);`
25. 
26. `boolean casProducerIndex(long index)`
27. `{`
28. `//对producerIndex加1,注意一点,和disruptor不同的是,producerIndex的值是一直增加的`
29. `return UNSAFE.compareAndSwapLong(this, OFFSET, index, index + 1);`
30. `}`
31. `}`
32. 
33. `abstract class Pad2 extends ProducerIndex`
34. `{`
35. `public long p1, p2, p3, p4, p5, p6, p7;`
36. 
37. `public static long noHuop(Pad2 instance)`
38. `{`
39. `return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;`
40. `}`
41. `}`
42. 
43. `abstract class ComsumerIndex extends Pad2`
44. `{`
45. `volatile long consumerIndex;`
46. `private static final long OFFSET = UNSAFE.getFieldOffset("consumerIndex", ComsumerIndex.class);`
47. 
48. `void orderedSetComsumerIndex(long index)`
49. `{`
50. `UNSAFE.putOrderedLong(this, OFFSET, index);`
51. `}`
52. `}`
53. 
54. `abstract class Pad3 extends ComsumerIndex`
55. `{`
56. `long p1, p2, p3, p4, p5, p6, p7;//缓存行填充`
57. 
58. `public static long noHuop(Pad3 instance)`
59. `{`
60. `return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;`
61. `}`
62. `}`
63. 
64. `abstract class ProducerIndexLimit extends Pad3`
65. `{`
66. `volatile long producerIndexLimit = 0;`
67. `private static final long OFFSET = UNSAFE.getFieldOffset("producerIndexLimit", ProducerIndexLimit.class);`
68. 
69. `void orderedSetProducerIndexLimit(long limit)`
70. `{`
71. `UNSAFE.putOrderedLong(this, OFFSET, limit);`
72. `}`
73. `}`
74. 
75. `abstract class Pad4 extends ProducerIndexLimit`
76. `{`
77. `long p1, p2, p3, p4, p5, p6, p7;`
78. 
79. `static final int availableBufferOffset = UNSAFE.arrayBaseOffset(new int[0].getClass());`
80. `static final int bufferOffset = UNSAFE.arrayBaseOffset(Object[].class);`
81. `static final int availableBufferScaleShift;`
82. `static final int bufferScaleShift;`
83. 
84. `static`
85. `{`
86. `int availableBufferScale = UNSAFE.arrayIndexScale(new int[0].getClass());`
87. `if (availableBufferScale == 4)`
88. `{`
89. `availableBufferScaleShift = 2;`
90. `}`
91. `else if (availableBufferScale == 8)`
92. `{`
93. `availableBufferScaleShift = 3;`
94. `}`
95. `else`
96. `{`
97. `throw new IllegalArgumentException();`
98. `}`
99. `int bufferScale = UNSAFE.arrayIndexScale(Object[].class);`
100. `if (bufferScale == 4)`
101. `{`
102. `bufferScaleShift = 2;`
103. `}`
104. `else if (bufferScale == 8)`
105. `{`
106. `bufferScaleShift = 3;`
107. `}`
108. `else`
109. `{`
110. `throw new IllegalArgumentException();`
111. `}`
112. `}`
113. 
114. `public static long noHuop(Pad4 instance)`
115. `{`
116. `return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;`
117. `}`
118. `}`
119. 
120. `public class MPSCArrayQueue<E> extends Pad4 implements Queue<E>`
121. `{`
122. 
123. `public MPSCArrayQueue(int capacity)`
124. `{`
125. `int size = 1;`
126. `int indexShift = 0;`
127. `//size的取值为2次方的值,目的是为了下面的index&mask的取值结果是数组索引下标,最终为大于或等于capacity的2次方幂`
128. `while (size < capacity && size > 0)`
129. `{`
130. `size <<= 1;`
131. `//2次幂的数量 2 2^2 2^3 以2为底,等比为2的等比数列的数量`
132. `indexShift++;`
133. `}`
134. `if (size > 0)`
135. `{`
136. `this.indexShift = indexShift;`
137. `mask = size - 1;`
138. `//存放数据的buffer`
139. `buffer = new Object[size];`
140. `//可用的buffer`
141. `availableBuffers = new int[size];`
142. `//给数组中的每个元素赋一个初始值为-1`
143. `Arrays.fill(availableBuffers, -1);`
144. `}`
145. `else`
146. `{`
147. `throw new IllegalArgumentException("capacity 无法计算得到其最小的2次方幂");`
148. `}`
149. `}`
150. 
151. `protected final Object[] buffer;`
152. `protected final int mask;`
153. `protected final int[] availableBuffers;`
154. `protected final int indexShift;`
155. 
156. `boolean isAvailable(long address, int flag, int[] availableBuffers)`
157. `{`
158. `return UNSAFE.getIntVolatile(availableBuffers, address) == flag;`
159. `}`
160. 
161. `void setAvailable(long index)`
162. `{`
163. `//mask = size -1`
164. `//index < size -1 的`
165. `// size - 1为2的n次方幂`
166. `// n 其实就是indexShift`
167. `//>>表示右移,如果该数为正,则高位补0,若为负数,则高位补1;`
168. `//>>>表示无符号右移,也叫逻辑右移,即若该数为正,则高位补0,而若该数为负数,则右移后高位同样补0`
169. `//那么index >>> indexShift表示将index向右移indexShift位`
170. `//看一下极端情况当size - 1 = 2 ^ 3时,indexShift的值为3,这时如果index=size-1即8二进制为1000时向右移3位时值为1 所有小于8的值进行这种移位得到的结果都是0`
171. `//这也就说明,当flag为1时表示已经满了`
172. `int flag = (int) (index >>> indexShift);`
173. `long address = ((index & mask) << availableBufferScaleShift) + availableBufferOffset;`
174. `UNSAFE.putOrderedInt(availableBuffers, address, flag);`
175. `}`
176. 
177. `/**`
178. `* 获取下一个可以使用的生产者下标`
179. `*`
180. `* @return`
181. `*/`
182. `long nextProducerIndex()`
183. `{`
184. `long pIndex = producerIndex;`
185. `long pLimit = producerIndexLimit;`
186. `if (pIndex < pLimit)`
187. `{`
188. `if (casProducerIndex(pIndex))`
189. `{`
190. `return pIndex;`
191. `}`
192. `}`
193. `do`
194. `{`
195. `pIndex = producerIndex;`
196. `if (pIndex < pLimit)`
197. `{`
198. `if (casProducerIndex(pIndex))`
199. `{`
200. `return pIndex;`
201. `}`
202. `}`
203. `else`
204. `{`
205. `//mask的值是size-1`
206. `//consumerIndex表示消费者的索引位置`
207. `//进入这个判断分支就代表pIndex>=pLimit`
208. `//consumerIndex的值为0时,pLimit的值为size(因为mask的值为size-1)`
209. `pLimit = producerIndexLimit = consumerIndex + mask + 1;`
210. `if (pIndex >= producerIndexLimit)`
211. `{`
212. `// 队列已满`
213. `return -1;`
214. `}`
215. `else`
216. `{`
217. `if (casProducerIndex(pIndex))`
218. `{`
219. `return pIndex;`
220. `}`
221. `}`
222. `}`
223. `} while (true);`
224. `}`
225. 
226. `Object get(long index)`
227. `{`
228. `long address = ((index & mask) << bufferScaleShift) + bufferOffset;`
229. `return UNSAFE.getObject(buffer, address);`
230. `}`
231. 
232. `void set(Object value, long index)`
233. `{`
234. `//计算数组下标`
235. `//index&mask是计算数组的下标`
236. `//bufferOffset=UNSAFE.arrayBaseOffset(Object[].class) 返回Object数组基地址`
237. `// int bufferScale = UNSAFE.arrayIndexScale(Object[].class);`
238. `// if (bufferScale == 4)`
239. `// {`
240. `// bufferScaleShift = 2;`
241. `// }`
242. `// else if (bufferScale == 8)`
243. `// {`
244. `// bufferScaleShift = 3;`
245. `// }`
246. `//bufferScale代表的是数组中每个元素的占用的大小`
247. `//<< 位移运算符表示左移,左移2位相当于乘4`
248. `//当bufferScale大小为8时表示一个元素占用的大小为8,那么第一个位置是(0 <<3)+bufferOffset结果为bufferOffset;`
249. `//第二个元素1<<3 + bufferOffset,相当于1*8 + bufferOffset`
250. `//第三个元素2<<3 + bufferOffset相当于2*8 + bufferOffset`
251. 
252. 
253. `//关于index & mask 当mask为2^2 - 1时,index从一到10对mask取&的值为`
254. `// 0`
255. `// 1`
256. `// 2`
257. `// 3`
258. `// 0`
259. `// 1`
260. `// 2`
261. `// 3`
262. `// 0`
263. `// 1`
264. `//可见,虽然producerIndex的值和consumerIndex的值是一直增加的,但是对buffer的索引address是没有影响的`
265. `//唯一有约束的地方是nextProducerIndex方法中当pIndex >= producerIndexLimit时会停止生产,也就是说消费赶不上生产时会有停止生产的情况出现`
266. `long address = ((index & mask) << bufferScaleShift) + bufferOffset;`
267. `UNSAFE.putObject(buffer, address, value);`
268. `}`
269. 
270. `Object getAndSetNull(long index)`
271. `{`
272. `long address = ((index & mask) << bufferScaleShift) + bufferOffset;`
273. `Object result = UNSAFE.getObject(buffer, address);`
274. `UNSAFE.putObject(buffer, address, null);`
275. `return result;`
276. `}`
277. 
278. `void waitUnitlAvailable(long index)`
279. `{`
280. `int flag = (int) (index >>> indexShift);`
281. `long address = ((index & mask) << availableBufferScaleShift) + availableBufferOffset;`
282. `int[] availableBuffers = this.availableBuffers;`
283. `if (isAvailable(address, flag, availableBuffers) == false)`
284. `{`
285. `while (isAvailable(address, flag, availableBuffers) == false)`
286. `{`
287. `Thread.yield();`
288. `}`
289. `}`
290. `}`
291. 
292. `@Override`
293. `public int size()`
294. `{`
295. `long consumerIndex = this.consumerIndex;`
296. `long producerIndex = this.producerIndex;`
297. `return (int) (producerIndex - consumerIndex);`
298. `}`
299. 
300. `.............`
301. 
302. `@Override`
303. `public void clear()`
304. `{`
305. `long pIndex = producerIndex;`
306. `long cIndex = this.consumerIndex;`
307. `if (pIndex == cIndex)`
308. `{`
309. `return;`
310. `}`
311. `for (long index = cIndex; index < pIndex; index++)`
312. `{`
313. `waitUnitlAvailable(index);`
314. `set(null, index);`
315. `}`
316. `this.consumerIndex = pIndex;`
317. `}`
318. 
319. `@Override`
320. `public boolean add(E e)`
321. `{`
322. `return offer(e);`
323. `}`
324. 
325. `@Override`
326. `public boolean offer(E e)`
327. `{`
328. `long index = nextProducerIndex();`
329. `if (index == -1)`
330. `{`
331. `return false;`
332. `}`
333. `set(e, index);`
334. `setAvailable(index);`
335. `return true;`
336. `}`
337. 
338. `@Override`
339. `public E remove()`
340. `{`
341. `return poll();`
342. `}`
343. 
344. `@SuppressWarnings("unchecked")`
345. `@Override`
346. `public E poll()`
347. `{`
348. `//以与生产者相同的方式来获取flag,如果消费者与生产者用相同的算法得到的flag相同的话,则表示生产者已经生产到这个地方,可以进行消费。`
349. `long cIndex = this.consumerIndex;`
350. `int flag = (int) (cIndex >>> indexShift);`
351. `long address = ((cIndex & mask) << availableBufferScaleShift) + availableBufferOffset;`
352. `final int[] availableBuffers = this.availableBuffers;`
353. `if (isAvailable(address, flag, availableBuffers) == false)`
354. `{`
355. `if (cIndex == producerIndex)`
356. `{`
357. `return null;`
358. `}`
359. `while (isAvailable(address, flag, availableBuffers) == false)`
360. `{`
361. `// assert cIndex < consumerLimit;`
362. `Thread.yield();`
363. `}`
364. `}`
365. `//消费之后,将对应位置上的值置为null`
366. `E e = (E) getAndSetNull(cIndex);`
367. `//移动消费者的指针位置,这种方式在当consumerIndex很大时将获取不到数据`
368. `//注意一点,和disruptor不同的是,consumerIndex的值是一直增加的`
369. `orderedSetComsumerIndex(cIndex + 1);`
370. `return e;`
371. `}`
372. 
373. `@Override`
374. `public E element()`
375. `{`
376. `return peek();`
377. `}`
378. 
379. `@SuppressWarnings("unchecked")`
380. `@Override`
381. `public E peek()`
382. `{`
383. `long pIndex = producerIndex;`
384. `long consumerIndex = this.consumerIndex;`
385. `if (pIndex == consumerIndex)`
386. `{`
387. `return null;`
388. `}`
389. `//这里的原理与上面一样`
390. `int flag = (int) (consumerIndex >>> indexShift);`
391. `long address = ((consumerIndex & mask) << availableBufferScaleShift) + availableBufferOffset;`
392. `int[] availableBuffers = this.availableBuffers;`
393. `if (isAvailable(address, flag, availableBuffers) == false)`
394. `{`
395. `while (isAvailable(address, flag, availableBuffers) == false)`
396. `{`
397. `Thread.yield();`
398. `}`
399. `}`
400. `E e = (E) get(consumerIndex);`
401. `return e;`
402. `}`
403. 
404. `}`


    

这里主要的核心有以下几点:

  • unsafe的操作,通遍都是通过unsafe的cas来进行的,有很多while(cas())进行重试的部分,这块就不再多讲了;
  • size的取值,是最小的以2为首项,公比为2的等比数列最小的大于等于capacity的那个值;
  • mask 的取值为size -1 ,这样做的好处是通过index & mask能取到数组中对应的索引;
  • consumerIndex与producerIndex是一直增加的;
  • 当消费者速度不及生产者时,会停止生产,具体的可以看下上面代码中的注释。
  • index >>> indexShift中的indexShift的取值为2到size之间的等比数列的项数; index分别是指consumerIndex和producerIndex,这个index >>> indexShift的作用是用于 判断生产者和消费者的速度,在消费时位置是否可用。

参考:

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
边-边协同下的边缘智能应用平台 | 第 11 期边缘云主题Meetup
《边-边协同下的边缘智能应用平台》谢皓|火山引擎边缘云边缘智能负责人
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论