WorkerWrapper.java 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. package com.jd.platform.async.wrapper;
  2. import com.jd.platform.async.callback.DefaultCallback;
  3. import com.jd.platform.async.callback.ICallback;
  4. import com.jd.platform.async.callback.IWorker;
  5. import com.jd.platform.async.exception.SkippedException;
  6. import com.jd.platform.async.executor.timer.SystemClock;
  7. import com.jd.platform.async.worker.DependWrapper;
  8. import com.jd.platform.async.worker.ResultState;
  9. import com.jd.platform.async.worker.WorkResult;
  10. import java.util.*;
  11. import java.util.concurrent.CompletableFuture;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.TimeUnit;
  14. import java.util.concurrent.atomic.AtomicInteger;
  15. /**
  16. * 对每个worker及callback进行包装,一对一
  17. *
  18. * @author wuweifeng wrote on 2019-11-19.
  19. */
  20. public class WorkerWrapper<T, V> {
  21. /**
  22. * 该wrapper的唯一标识
  23. */
  24. private String id;
  25. /**
  26. * worker将来要处理的param
  27. */
  28. private T param;
  29. private IWorker<T, V> worker;
  30. private ICallback<T, V> callback;
  31. /**
  32. * 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程</p>
  33. * -------2
  34. * 1
  35. * -------3
  36. * 如1后面有2、3
  37. */
  38. private List<WorkerWrapper<?, ?>> nextWrappers;
  39. /**
  40. * 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己
  41. * 通过must字段来控制是否依赖项必须完成
  42. * 1
  43. * -------3
  44. * 2
  45. * 1、2执行完毕后才能执行3
  46. */
  47. private List<DependWrapper> dependWrappers;
  48. /**
  49. * 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
  50. * 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
  51. * <p>
  52. * 1-finish, 2-error, 3-working
  53. */
  54. private AtomicInteger state = new AtomicInteger(0);
  55. /**
  56. * 该map存放所有wrapper的id和wrapper映射
  57. */
  58. private Map<String, WorkerWrapper> forParamUseWrappers;
  59. /**
  60. * 也是个钩子变量,用来存临时的结果
  61. */
  62. private volatile WorkResult<V> workResult = WorkResult.defaultResult();
  63. /**
  64. * 是否在执行自己前,去校验nextWrapper的执行结果<p>
  65. * 1 4
  66. * -------3
  67. * 2
  68. * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
  69. * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
  70. */
  71. private volatile boolean needCheckNextWrapperResult = true;
  72. private static final int FINISH = 1;
  73. private static final int ERROR = 2;
  74. private static final int WORKING = 3;
  75. private static final int INIT = 0;
  76. private WorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
  77. if (worker == null) {
  78. throw new NullPointerException("async.worker is null");
  79. }
  80. this.worker = worker;
  81. this.param = param;
  82. this.id = id;
  83. //允许不设置回调
  84. if (callback == null) {
  85. callback = new DefaultCallback<>();
  86. }
  87. this.callback = callback;
  88. }
  89. /**
  90. * 开始工作
  91. * fromWrapper代表这次work是由哪个上游wrapper发起的
  92. */
  93. private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
  94. this.forParamUseWrappers = forParamUseWrappers;
  95. //将自己放到所有wrapper的集合里去
  96. forParamUseWrappers.put(id, this);
  97. long now = SystemClock.now();
  98. //总的已经超时了,就快速失败,进行下一个
  99. if (remainTime <= 0) {
  100. fastFail(INIT, null);
  101. beginNext(executorService, now, remainTime);
  102. return;
  103. }
  104. //如果自己已经执行过了。
  105. //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
  106. if (getState() == FINISH || getState() == ERROR) {
  107. beginNext(executorService, now, remainTime);
  108. return;
  109. }
  110. //如果在执行前需要校验nextWrapper的状态
  111. if (needCheckNextWrapperResult) {
  112. //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
  113. if (!checkNextWrapperResult()) {
  114. fastFail(INIT, new SkippedException());
  115. beginNext(executorService, now, remainTime);
  116. return;
  117. }
  118. }
  119. //如果没有任何依赖,说明自己就是第一批要执行的
  120. if (dependWrappers == null || dependWrappers.size() == 0) {
  121. fire();
  122. beginNext(executorService, now, remainTime);
  123. return;
  124. }
  125. /*如果有前方依赖,存在两种情况
  126. 一种是前面只有一个wrapper。即 A -> B
  127. 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
  128. 所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
  129. //只有一个依赖
  130. if (dependWrappers.size() == 1) {
  131. doDependsOneJob(fromWrapper);
  132. beginNext(executorService, now, remainTime);
  133. } else {
  134. //有多个依赖时
  135. doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
  136. }
  137. }
  138. public void work(ExecutorService executorService, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
  139. work(executorService, null, remainTime, forParamUseWrappers);
  140. }
  141. /**
  142. * 总控制台超时,停止所有任务
  143. */
  144. public void stopNow() {
  145. if (getState() == INIT || getState() == WORKING) {
  146. fastFail(getState(), null);
  147. }
  148. }
  149. /**
  150. * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的
  151. * 如果没有返回true,如果有返回false
  152. */
  153. private boolean checkNextWrapperResult() {
  154. //如果自己就是最后一个,或者后面有并行的多个,就返回true
  155. if (nextWrappers == null || nextWrappers.size() != 1) {
  156. return getState() == INIT;
  157. }
  158. WorkerWrapper nextWrapper = nextWrappers.get(0);
  159. boolean state = nextWrapper.getState() == INIT;
  160. //继续校验自己的next的状态
  161. return state && nextWrapper.checkNextWrapperResult();
  162. }
  163. /**
  164. * 进行下一个任务
  165. */
  166. private void beginNext(ExecutorService executorService, long now, long remainTime) {
  167. //花费的时间
  168. long costTime = SystemClock.now() - now;
  169. if (nextWrappers == null) {
  170. return;
  171. }
  172. if (nextWrappers.size() == 1) {
  173. nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
  174. return;
  175. }
  176. CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
  177. for (int i = 0; i < nextWrappers.size(); i++) {
  178. int finalI = i;
  179. futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
  180. .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
  181. }
  182. try {
  183. CompletableFuture.allOf(futures).get(remainTime - costTime, TimeUnit.MILLISECONDS);
  184. } catch (Exception e) {
  185. e.printStackTrace();
  186. }
  187. }
  188. private void doDependsOneJob(WorkerWrapper dependWrapper) {
  189. if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
  190. workResult = defaultResult();
  191. fastFail(INIT, null);
  192. } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
  193. workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
  194. fastFail(INIT, null);
  195. } else {
  196. //前面任务正常完毕了,该自己了
  197. fire();
  198. }
  199. }
  200. private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
  201. //如果当前任务已经完成了,依赖的其他任务拿到锁再进来时,不需要执行下面的逻辑了。
  202. if (getState() != INIT) {
  203. return;
  204. }
  205. boolean nowDependIsMust = false;
  206. //创建必须完成的上游wrapper集合
  207. Set<DependWrapper> mustWrapper = new HashSet<>();
  208. for (DependWrapper dependWrapper : dependWrappers) {
  209. if (dependWrapper.isMust()) {
  210. mustWrapper.add(dependWrapper);
  211. }
  212. if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
  213. nowDependIsMust = dependWrapper.isMust();
  214. }
  215. }
  216. //如果全部是不必须的条件,那么只要到了这里,就执行自己。
  217. if (mustWrapper.size() == 0) {
  218. if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
  219. fastFail(INIT, null);
  220. } else {
  221. fire();
  222. }
  223. beginNext(executorService, now, remainTime);
  224. return;
  225. }
  226. //如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干
  227. if (!nowDependIsMust) {
  228. return;
  229. }
  230. //如果fromWrapper是必须的
  231. boolean existNoFinish = false;
  232. boolean hasError = false;
  233. //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
  234. for (DependWrapper dependWrapper : mustWrapper) {
  235. WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
  236. WorkResult tempWorkResult = workerWrapper.getWorkResult();
  237. //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
  238. if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
  239. existNoFinish = true;
  240. break;
  241. }
  242. if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
  243. workResult = defaultResult();
  244. hasError = true;
  245. break;
  246. }
  247. if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
  248. workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
  249. hasError = true;
  250. break;
  251. }
  252. }
  253. //只要有失败的
  254. if (hasError) {
  255. fastFail(INIT, null);
  256. beginNext(executorService, now, remainTime);
  257. return;
  258. }
  259. //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
  260. //都finish的话
  261. if (!existNoFinish) {
  262. //上游都finish了,进行自己
  263. fire();
  264. beginNext(executorService, now, remainTime);
  265. return;
  266. }
  267. }
  268. /**
  269. * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
  270. */
  271. private void fire() {
  272. //阻塞取结果
  273. workResult = workerDoJob();
  274. }
  275. /**
  276. * 快速失败
  277. */
  278. private boolean fastFail(int expect, Exception e) {
  279. //试图将它从expect状态,改成Error
  280. if (!compareAndSetState(expect, ERROR)) {
  281. return false;
  282. }
  283. //尚未处理过结果
  284. if (checkIsNullResult()) {
  285. if (e == null) {
  286. workResult = defaultResult();
  287. } else {
  288. workResult = defaultExResult(e);
  289. }
  290. }
  291. callback.result(false, param, workResult);
  292. return true;
  293. }
  294. /**
  295. * 具体的单个worker执行任务
  296. */
  297. private WorkResult<V> workerDoJob() {
  298. //避免重复执行
  299. if (!checkIsNullResult()) {
  300. return workResult;
  301. }
  302. try {
  303. //如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
  304. if (!compareAndSetState(INIT, WORKING)) {
  305. return workResult;
  306. }
  307. callback.begin();
  308. //执行耗时操作
  309. V resultValue = worker.action(param, forParamUseWrappers);
  310. //如果状态不是在working,说明别的地方已经修改了
  311. if (!compareAndSetState(WORKING, FINISH)) {
  312. return workResult;
  313. }
  314. workResult.setResultState(ResultState.SUCCESS);
  315. workResult.setResult(resultValue);
  316. //回调成功
  317. callback.result(true, param, workResult);
  318. return workResult;
  319. } catch (Exception e) {
  320. //避免重复回调
  321. if (!checkIsNullResult()) {
  322. return workResult;
  323. }
  324. fastFail(WORKING, e);
  325. return workResult;
  326. }
  327. }
  328. public WorkResult<V> getWorkResult() {
  329. return workResult;
  330. }
  331. public List<WorkerWrapper<?, ?>> getNextWrappers() {
  332. return nextWrappers;
  333. }
  334. public void setParam(T param) {
  335. this.param = param;
  336. }
  337. private boolean checkIsNullResult() {
  338. return ResultState.DEFAULT == workResult.getResultState();
  339. }
  340. private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
  341. addDepend(new DependWrapper(workerWrapper, must));
  342. }
  343. private void addDepend(DependWrapper dependWrapper) {
  344. if (dependWrappers == null) {
  345. dependWrappers = new ArrayList<>();
  346. }
  347. //如果依赖的是重复的同一个,就不重复添加了
  348. for (DependWrapper wrapper : dependWrappers) {
  349. if (wrapper.equals(dependWrapper)) {
  350. return;
  351. }
  352. }
  353. dependWrappers.add(dependWrapper);
  354. }
  355. private void addNext(WorkerWrapper<?, ?> workerWrapper) {
  356. if (nextWrappers == null) {
  357. nextWrappers = new ArrayList<>();
  358. }
  359. //避免添加重复
  360. for (WorkerWrapper wrapper : nextWrappers) {
  361. if (workerWrapper.equals(wrapper)) {
  362. return;
  363. }
  364. }
  365. nextWrappers.add(workerWrapper);
  366. }
  367. private void addNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
  368. if (wrappers == null) {
  369. return;
  370. }
  371. for (WorkerWrapper<?, ?> wrapper : wrappers) {
  372. addNext(wrapper);
  373. }
  374. }
  375. private void addDependWrappers(List<DependWrapper> dependWrappers) {
  376. if (dependWrappers == null) {
  377. return;
  378. }
  379. for (DependWrapper wrapper : dependWrappers) {
  380. addDepend(wrapper);
  381. }
  382. }
  383. private WorkResult<V> defaultResult() {
  384. workResult.setResultState(ResultState.TIMEOUT);
  385. workResult.setResult(worker.defaultValue());
  386. return workResult;
  387. }
  388. private WorkResult<V> defaultExResult(Exception ex) {
  389. workResult.setResultState(ResultState.EXCEPTION);
  390. workResult.setResult(worker.defaultValue());
  391. workResult.setEx(ex);
  392. return workResult;
  393. }
  394. private int getState() {
  395. return state.get();
  396. }
  397. public String getId() {
  398. return id;
  399. }
  400. private boolean compareAndSetState(int expect, int update) {
  401. return this.state.compareAndSet(expect, update);
  402. }
  403. private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
  404. this.needCheckNextWrapperResult = needCheckNextWrapperResult;
  405. }
  406. @Override
  407. public boolean equals(Object o) {
  408. if (this == o) {
  409. return true;
  410. }
  411. if (o == null || getClass() != o.getClass()) {
  412. return false;
  413. }
  414. WorkerWrapper<?, ?> that = (WorkerWrapper<?, ?>) o;
  415. return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
  416. Objects.equals(param, that.param) &&
  417. Objects.equals(worker, that.worker) &&
  418. Objects.equals(callback, that.callback) &&
  419. Objects.equals(nextWrappers, that.nextWrappers) &&
  420. Objects.equals(dependWrappers, that.dependWrappers) &&
  421. Objects.equals(state, that.state) &&
  422. Objects.equals(workResult, that.workResult);
  423. }
  424. @Override
  425. public int hashCode() {
  426. return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
  427. }
  428. public static class Builder<W, C> {
  429. /**
  430. * 该wrapper的唯一标识
  431. */
  432. private String id = UUID.randomUUID().toString();
  433. /**
  434. * worker将来要处理的param
  435. */
  436. private W param;
  437. private IWorker<W, C> worker;
  438. private ICallback<W, C> callback;
  439. /**
  440. * 自己后面的所有
  441. */
  442. private List<WorkerWrapper<?, ?>> nextWrappers;
  443. /**
  444. * 自己依赖的所有
  445. */
  446. private List<DependWrapper> dependWrappers;
  447. /**
  448. * 存储强依赖于自己的wrapper集合
  449. */
  450. private Set<WorkerWrapper<?, ?>> selfIsMustSet;
  451. private boolean needCheckNextWrapperResult = true;
  452. public Builder<W, C> worker(IWorker<W, C> worker) {
  453. this.worker = worker;
  454. return this;
  455. }
  456. public Builder<W, C> param(W w) {
  457. this.param = w;
  458. return this;
  459. }
  460. public Builder<W, C> id(String id) {
  461. if (id != null) {
  462. this.id = id;
  463. }
  464. return this;
  465. }
  466. public Builder<W, C> needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
  467. this.needCheckNextWrapperResult = needCheckNextWrapperResult;
  468. return this;
  469. }
  470. public Builder<W, C> callback(ICallback<W, C> callback) {
  471. this.callback = callback;
  472. return this;
  473. }
  474. public Builder<W, C> depend(WorkerWrapper<?, ?>... wrappers) {
  475. if (wrappers == null) {
  476. return this;
  477. }
  478. for (WorkerWrapper<?, ?> wrapper : wrappers) {
  479. depend(wrapper);
  480. }
  481. return this;
  482. }
  483. public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper) {
  484. return depend(wrapper, true);
  485. }
  486. public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
  487. if (wrapper == null) {
  488. return this;
  489. }
  490. DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
  491. if (dependWrappers == null) {
  492. dependWrappers = new ArrayList<>();
  493. }
  494. dependWrappers.add(dependWrapper);
  495. return this;
  496. }
  497. public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
  498. return next(wrapper, true);
  499. }
  500. public Builder<W, C> next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
  501. if (nextWrappers == null) {
  502. nextWrappers = new ArrayList<>();
  503. }
  504. nextWrappers.add(wrapper);
  505. //强依赖自己
  506. if (selfIsMust) {
  507. if (selfIsMustSet == null) {
  508. selfIsMustSet = new HashSet<>();
  509. }
  510. selfIsMustSet.add(wrapper);
  511. }
  512. return this;
  513. }
  514. public Builder<W, C> next(WorkerWrapper<?, ?>... wrappers) {
  515. if (wrappers == null) {
  516. return this;
  517. }
  518. for (WorkerWrapper<?, ?> wrapper : wrappers) {
  519. next(wrapper);
  520. }
  521. return this;
  522. }
  523. public WorkerWrapper<W, C> build() {
  524. WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(id, worker, param, callback);
  525. wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
  526. if (dependWrappers != null) {
  527. for (DependWrapper workerWrapper : dependWrappers) {
  528. workerWrapper.getDependWrapper().addNext(wrapper);
  529. wrapper.addDepend(workerWrapper);
  530. }
  531. }
  532. if (nextWrappers != null) {
  533. for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
  534. boolean must = false;
  535. if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
  536. must = true;
  537. }
  538. workerWrapper.addDepend(wrapper, must);
  539. wrapper.addNext(workerWrapper);
  540. }
  541. }
  542. return wrapper;
  543. }
  544. }
  545. }