实现计算单元间的串行并行编排,通过异步回调解耦,支持任务组超时限制、条件触发和计算结果传递。
多任务串行、并行编排:
1,具有依赖关系的任务组编排。比如a-->(b-->c, d-->e-->f)-->g ),a触发b和d,b触发c,d触发e,e触发f,最后c,f共同触发g;
2,前置任务可以设置强制依赖和非强制依赖。强制依赖任务必须全部正常返回才会触发后续任务,非强制依赖不对后续触发产生影响。比如(optional(a, b),must(c, d))-->e,a,b为强制依赖必须全部正常完成才能触发e,非强制依赖c,d不对e的触发产生影响;
3,任务组间线程隔离。
任务回调:每个任务都可以有回调,用于实现监控、日志、设置默认值、异常处理等功能;
短路特性:当设置任一前置任务执行成功即触发后续任务时,如果某一前置任务成功执行,即触发后续任务,其余前置任务将被跳过不再执行。例如any(a, b-->c, d-->e-->f)-->g中任务a成功执行触发g,其余前置任务支路b-->c, d-->e-->f将被短路,跳过不执行;
可以为任务组设置超时限制,超时后还未被执行的任务将不会被处理;
上游任务结果依赖:同一个任务组内下游任务可以获取任一上游任务计算结果作为自己的输入;
无锁编程、线程资源复用。
Worker:最基本计算单元,无状态节点,执行计算任务,支持设置默认计算结果。
Callback:为Worker执行前置准备和后置回调操作,无状态节点,通过异步回调避免任务调用方阻塞。
WorkerWrapper:对每个worker及callback进行一对一包装。有状态节点,保存包装节点运行状态、前置依赖节点列表、后续调用计算节点列表、计算节点输入参数和输出返回值。同时决定当前任务节点是否被触发,并发起对后续计算任务节点的调用。
Async:任务执行入口,传入任务组起始节点,并控制任务组超时。
WorkerWorker接受输入参数,同时可以通过共享容器获取到其它计算节点的返回值,最后将自己的计算结果放入共享容器中
xxxxxxxxxx11012public interface IWorker<T, V> {3 // 计算任务4 V action(T object, Map<String, WorkerWrapper> allWrappers);5 // 设置默认值,用于计算节点未正常结束时的返回值6 default V defaultValue() {7 return null;8 }9}10
CallbackCallback为Worker执行前置准备和后置回调操作,回调方法中将根据执行成功与否、输入参数与执行结果执行具体的回调逻辑。
xxxxxxxxxx11012public interface IWorker<T, V> {3 // 计算任务4 V action(T object, Map<String, WorkerWrapper> allWrappers);5 // 设置默认值,用于计算节点未正常结束时的返回值6 default V defaultValue() {7 return null;8 }9}10
WorkerWrapper对每个Worker及Callback进行一对一包装,由于Worker和Callback都是无状态类型,所以需要包装节点保存节点状态。主要处理前后节点间联系,以及当前节点与后续节点的触发执行。
xxxxxxxxxx1181public class WorkerWrapper<T, V> {2 // 唯一标识3 private String id;4 // 任务参数5 private T param;6 private IWorker<T, V> worker;7 private ICallback<T, V> callback;8 // 后续任务9 private List<WorkerWrapper<?, ?>> nextWrappers;10 // 前置任务DependWrapper=<WorkerWrapper, isMust>,11 // 有强制依赖isMust=true,和非强制依赖isMust=false之分12 private List<DependWrapper> dependWrappers;13 // 任务执行状态14 // 0-init, 1-finish, 2-error, 3-working15 private AtomicInteger state = new AtomicInteger(0);16 // 存放所有节点计算结果17 private Map<String, WorkerWrapper> forParamUseWrappers;18}
当前任务组未超时+当前任务未被执行+当前任务支线有执行的必要+当前任务的所有强制依赖已全部完成时,当前任务将会触发执行。
超时限制:可以为整个任务组设置时间限制,从源头任务开始计时,随着任务的执行运行时间逐渐增加,超时控制分为任务内部,以及任务外部。
在任务执行内部,每次执行当前任务前需要检查是否超时,如果超时将跳过当前任务,继续执行后续处理。
xxxxxxxxxx1181/**2 * 执行当前任务3 * executorService:执行任务的线程池4 * fromWrapper:代表当前work是由上游哪个wrapper发起5 * remainTime: 当前任务组剩余执行时间6 */7private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {8 long now = SystemClock.now();9 // 执行当前任务前超时检查:当前任务组已超时,跳过执行,快速失败,进行后续节点调用10 if (remainTime <= 0) {11 // 设置默认值并执行回调12 fastFail(INIT, null);13 beginNext(executorService, now, remainTime);14 return;15 }16 dosomething()17}18
同时执行完当前任务处理,触发后续任务处理时,在后续任务提交执行后,开启超时等待,超时后将不再等待后续任务返回。
xxxxxxxxxx1191private void beginNext(ExecutorService executorService, long now, long remainTime) {2 // 已经花费的时间3 long costTime = SystemClock.now() - now;4 // 后续任务提交线程池异步执行,由于后续任务存在多条支路,需要在外部统一控制超时5 CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];6 for (int i = 0; i < nextWrappers.size(); i++) {7 int finalI = i;8 futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)9 .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers),10 executorService);11 }12 try {13 // 任务外部控制器:超时等待所有任务在指定时限内返回,超时后将不再等待,14 // 注意这里未强制中断超时任务15 CompletableFuture.allOf(futures).get(remainTime - costTime, TimeUnit.MILLISECONDS);16 } catch (Exception e) {17 e.printStackTrace();18 }19}
执行状态检查:由上述的超时控制可知,任务执行内部只在执行前检查,无法在任务进行中实时监测超时状态。同时任务外部控制器的超时等待超时后,将不再继续等待,直接继续进行下一阶段任务处理。正在执行的超时任务未被强制终止,仍然有可能正常返回,再次触发下一阶段任务处理。所以下一阶段任务将可能被多次触发,需要额外处理,防止任务被重复触发。
xxxxxxxxxx1101private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {2 dosomething()3 // 如果当前节点已经执行过,则不再执行4 if (getState() == FINISH || getState() == ERROR) {5 beginNext(executorService, now, remainTime);6 return;7 }8 dosomething()9}10
执行必要性判断:对于存在多个可选前置依赖的任务,如果它已被某个依赖触发,则其他前置依赖支线任务没必要再执行。如任务组optional(a->b->c,d->e)->f中任务f可以被支路a->b->c触发,也可以被d->e触发。当任务f被e触发后,任务支线a->b->c没继续执行必要。
xxxxxxxxxx1111private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {2 dosomething()3 // 如果当前任务的next链上有已经出结果或有已经开始执行的任务,当前任务不用继续执行4 // 短路特性,后续任务已被其它节点触发,不再执行当前支路任务5 if (!checkNextWrapperResult()) {6 fastFail(INIT, new SkippedException());7 beginNext(executorService, now, remainTime);8 return;9 }10 dosomething()11}
checkNextWrapperResult方法将从当前节点开始以DFS 方式遍历后续任务,如果出现已经出结果或有已经开始执行的任务,证明当前任务不用继续执行
xxxxxxxxxx1161
2/**3 * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的节点4 * 上述判断只针对后续任务数=1等情形成立(如a->b->c)5 */6private boolean checkNextWrapperResult() {7 // 如果当前任务就是最后一个,或者后面有并行的多个后置任务,表示当前节点必须被执行8 if (nextWrappers == null || nextWrappers.size() != 1) {9 return getState() == INIT;10 }11 // 查看当前节点是否未被执行12 WorkerWrapper nextWrapper = nextWrappers.get(0);13 boolean state = nextWrapper.getState() == INIT;14 // DFS方式继续校验自后续节点执行状态,保证后续链路任务都未被执行15 return state && nextWrapper.checkNextWrapperResult();16}
强制依赖检查:只有当前结点是任务组的最初始节点,或者全部强制依赖已正常完成,才能证明当前节点以满足执行条件,才会开始执行。
xxxxxxxxxx1181private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {2 dosomething()3 // 如果没有任何依赖,说明自己就是起始任务4 if (dependWrappers == null || dependWrappers.size() == 0) {5 // 执行当前任务6 fire();7 beginNext(executorService, now, remainTime);8 return;9 }10 // 只有一个依赖,则说明当前任务的全部依赖已运行完毕,需要进一步判断依赖是否正常结束11 if (dependWrappers.size() == 1) {12 doDependsOneJob(fromWrapper);13 beginNext(executorService, now, remainTime);14 } else {15 // 有多个依赖时,当前节点被触发,需要检查全部必须依赖是否已正常完成16 doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);17 }18}
当只有一个强制依赖,只需该依赖任务正常完成,即可执行当前任务;
xxxxxxxxxx1171/**2 * 如果只有一个强制依赖,只需该依赖正常完成3 */4private void doDependsOneJob(WorkerWrapper dependWrapper) {5 // 唯一前置依赖节点处于超时或者异常态,则当前任务支线异常,跳过当前任务6 if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {7 workResult = defaultResult();8 fastFail(INIT, null);9 } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {10 workResult = defaultExResult(dependWrapper.getWorkResult().getEx());11 fastFail(INIT, null);12 } else {13 // 唯一前置依赖节点正常执行,执行当前节点14 fire();15 }16}17
如果存在多个依赖,需要保证触发当前任务的前置任务必须是强制依赖,全部强制依赖才有可能已完成,之后再进行进一步检测强制依赖是否都正常完成。
同时需要对该方法加锁串行化访问,防止在存在多个依赖时,可能存在多个依赖同时触发当前任务执行判断,所以需要加锁串行化访问,防止并发读写,同时当前任务只等待其最后一个强制依赖完成,所以其他依赖触发doDependsJobs是无意义的,也没有并行化的意义。
xxxxxxxxxx1701/**2 * 如果存在多个强制依赖,需要保证触发当前任务的前置任务必须是强制依赖,之后再检查依赖是否都正常完成3 */4private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers,WorkerWrapper fromWrapper, long now, long remainTime) {5 // 加锁序列化访问,用于多个依赖同时触发下一节点时的情形6 // 如果当前节点已被其他依赖触发,则无需重复执行7 if (getState() != INIT) {8 return;9 }10 // 只有触发触发当前任务的前置节点fromWrapper是当前节点的必须依赖时,才有可能全部强制依赖已完成11 boolean nowDependIsMust = false;12 // 必须完成的上游wrapper集合13 Set<DependWrapper> mustWrapper = new HashSet<>();14 for (DependWrapper dependWrapper : dependWrappers) {15 if (dependWrapper.isMust()) {16 mustWrapper.add(dependWrapper);17 }18 if (dependWrapper.getDependWrapper().equals(fromWrapper)) {19 nowDependIsMust = dependWrapper.isMust();20 }21 }22
23 // 前置节点fromWrapper不是当前节点的必须依赖,且存在强制依赖24 // 则可能存在未完成强制依赖,直接返回25 if (!nowDependIsMust) {26 return;27 }28
29 // 如果前置fromWrapper是必须的,则有可能触发当前节点,具体取决于全部前置依赖是否完成30 boolean existNoFinish = false;31 boolean hasError = false;32
33 // 判断必须要执行的依赖任务的执行结果,如果有任何一个超时或者异常,证明当前任务不会被触发,直接跳过当前任务34 for (DependWrapper dependWrapper : mustWrapper) {35 WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();36 WorkResult tempWorkResult = workerWrapper.getWorkResult();37 // 未完成38 if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {39 existNoFinish = true;40 break;41 }42 // 超时43 if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {44 workResult = defaultResult();45 hasError = true;46 break;47 }48 // 异常49 if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {50 workResult = defaultExResult(workerWrapper.getWorkResult().getEx());51 hasError = true;52 break;53 }54
55 }56
57 // 如果有任何一个超时或者异常,证明当前任务不会被触发,直接跳过当前任务58 if (hasError) {59 fastFail(INIT, null);60 beginNext(executorService, now, remainTime);61 return;62 }63
64 // 强制依赖全部正常完成,不存在未执行或正在执行的节点,则当前节点被触发65 if (!existNoFinish) {66 fire();67 beginNext(executorService, now, remainTime);68 return;69 }70}
任务的执行分为三部分:前置准备+任务调用+后置回调。在执行的最开始以及回调前,通过CAS将节点的状态设置为WORKING以及FINISH。后续如果再被触发,将通过状态判断出已执行,提前返回,防止被重复执行。通过CAS方式避免加锁,避免不需要的线程竞争。
为什么需要CAS并发写入控制:上面的任务触发部分已经保证每个节点只会被执行一次,理论上不需要通过CAS并发控制,但是有两种特殊情况将导致并发修改节点状态:
1,某个任务导致任务组超时,此时外部超时控制机制将执行所有节点的快速失败方法,将修改节点状态;同时该超时任务并未结束,将可能导致并发写入节点状态。
2,某个任务导致任务组超时,此时外部超时控制机制将执行所有节点的快速失败方法,将修改节点状态;同时该超时任务成功结束,触发后续节点的执行,后续节点的work方法将发现任务组已超时,执行后续节点的快速失败方法,后续节点失败方法可能并发执行,导致并发写入节点状态。
xxxxxxxxxx1191private WorkResult<V> workerDoJob() {2 try {3 // 将当前状态通过CAS方式从INIT改为WORKING4 if (!compareAndSetState(INIT, WORKING)) {5 return workResult;6 }7 // 前置任务8 callback.begin();9 V resultValue = worker.action(param, forParamUseWrappers);10 // 将当前状态通过CAS方式从WORKING改为FINISH11 if (!compareAndSetState(WORKING, FINISH)) {12 return workResult;13 }14 // 任务回调15 callback.result(true, param, workResult);16 return workResult;17 } catch (Exception e) {18 }19}
当前任务执行完成后需要触发后续任务的执行,根据后续任务的数量可写分为:只有一个后续任务,则复用当前任务线程;如果存在多个后续任务,则将他们交由线程池异步执行,当前任务线程将执行外部超时控制逻辑,等待至超时时限后返回。
xxxxxxxxxx1201private void beginNext(ExecutorService executorService, long now, long remainTime) {2 // 如果只有一个后置任务,直接使用当前线程执行,再后续任务中执行超时控制3 if (nextWrappers.size() == 1) {4 nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);5 return;6 }7 // 如果存在多个后续任务,线程池异步执行多个任务,同时当前线程在任务外部统一控制超时8 CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];9 for (int i = 0; i < nextWrappers.size(); i++) {10 int finalI = i;11 futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)12 .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers),13 executorService);14 }15 try {16 // 等待全部支路任务在时限务返回17 CompletableFuture.allOf(futures).get(remainTime - costTime, TimeUnit.MILLISECONDS);18 } catch (Exception e) {19 }20}
需要注意的是,不是只有当前任务正常执行后才需要触发后续任务的执行。前置依赖任务超时、异常、当前任务执行前已超时、当前任务被短路等情形下,当前任务将不会执行,在为当前任务执行快速失败后,仍将执行后续任务的触发,此时后续任务同样不会真正被执行,只会执行他们的快速失败方法,这主要是为当前任务和后续任务设置默认值,并以默认值执行回调,保证任务组执行的完整。
这里同样需要CAS并发写入控制,理由同workerDoJob。
xxxxxxxxxx1171private boolean fastFail(int expect, Exception e) {2 // 试图将它从expect状态,改成Error3 // 同样需要`CAS`并发写入控制,理由同上。4 if (!compareAndSetState(expect, ERROR)) {5 return false;6 }7 // 计算结果仍未默认值空值8 if (checkIsNullResult()) {9 if (e == null) {10 workResult = defaultResult();11 } else {12 workResult = defaultExResult(e);13 }14 }15 callback.result(false, param, workResult);16 return true;17}
Async任务组启动的入口为Async的beginWork方法,将源头任务交由线程池执行,开启整个任务组的执行。beginWork方法与beginNext方法较为类似,但最大不同点在于启动任务组的线程会作为超时控制线程,当设置的超时时间到达后,为所有未执行或者正在执行的WorkerWrapper执行fastFail快速失败,设置默认值及进行回调,同时未被执行任务将不会被执行,正在执行的任务也不会被强制中断,效果类似线程池的stop方法。
xxxxxxxxxx1221public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {2 // 将任务组起始任务交由线程池异步执行3 CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];4 for (int i = 0; i < workerWrappers.size(); i++) {5 WorkerWrapper wrapper = workerWrappers.get(i);6 futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);7 }8 try {9 // 启动任务组的线程会作为超时控制线程,等待超时时间到达10 CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);11 return true;12 } catch (TimeoutException e) {13 Set<WorkerWrapper> set = new HashSet<>();14 totalWorkers(workerWrappers, set);15 // 为所有未执行或者正在执行的`WorkerWrapper`执行`fastFail`快速失败16 // 设置默认值及进行回调17 for (WorkerWrapper wrapper : set) {18 wrapper.stopNow();19 }20 return false;21 }22}
计算任务短路部分,判断只针对后续任务数等于1的情形成立(如a->b->c),如果后续任务数大于1,则该任务将被执行。可能作者认为一个任务被多个后续任务依赖,具有较高的重要性,所以只要该任务未被执行就默认将其执行,忽略短路特性。
我认为即使当前任务被多个后续任务依赖,多个后续任务是存在都被触发的情形,所以是可以被短路跳过的。可以修改检查是否需要短路的方法,DFS方式遍历以当前节点为起点的依赖图,如果出现未被触发的任务,证明当前任务由执行的必要,否则当前任务可以跳过。
xxxxxxxxxx1171private boolean checkNextWrapperResultDFS() {2 // 当前节点状态3 if (getState() == INIT) {4 return true;5 }6 // 无后续节点且当前已被执行7 if (nextWrappers == null) {8 return false;9 }10 // 查看是否存在未被执行后续依赖图节点11 for (WorkerWrapper nextWrapper : nextWrappers) {12 if (nextWrapper.checkNextWrapperResultDFS()) {13 return true;14 }15 }16 return false;17}
为什么任务组超时后只是跳过尚未执行任务,正在执行的任务仍然可以继续执行。我们可以通过在外部发起interrupt调用,线程内部使用isInterrupted检查中断状态,实现强制中断任务。但是长时间的任务一般是请求接口,强制中断的意义不大,同时isInterrupted额外增加复杂度。
线程池复用线程导致的ThradLocal数据污染问题。使用TransmittableThreadLocal,当从父线程提交任务到线程池时,TransmittableThreadLocal会捕获在父线程中设置的变量副本,并确保这些副本在子线程中可用,即使这些子线程可能是由线程池中的现有线程执行的。
状态并发读取问题:但计算任务完成后先设置状态为完成完成,再设置返回值。先设置标志位后设置结果方式将带来并发读写问题。
x
1private WorkResult<V> workerDoJob() {2 // 执行耗时操作3 V resultValue = worker.action(param, forParamUseWrappers);4 // 标志位:当前节点已完成5 if (!compareAndSetState(WORKING, FINISH)) {6 return workResult;7 }8 // 写入结果9 workResult.setResultState(ResultState.SUCCESS)10 workResult.setResult(resultValue);11 // 任务回调12 callback.result(true, param, workResult)13}例如must(a,b)->c任务中,a,b必须同时完成才会触发c,当a完成并进入c的触发判断时,b刚执行完worker.action并设置节点状态为已完成,此时c读取b状态发现其已完成,则c满足触发条件,进入action方法,并读取b的结果,此时b尚未将结果写入,导致c读取错误值。
可以调整顺序,先设置结果,再执行回调,最后设置状态。修改后上述情形下,c将不满足触发条件,c最终由b触发。
x
9131private WorkResult<V> workerDoJob() {2 // 执行耗时操作3 V resultValue = worker.action(param, forParamUseWrappers);4 // 写入结果5 workResult.setResultState(ResultState.SUCCESS);6 workResult.setResult(resultValue);7 // 任务回调8 callback.result(true, param, workResult)9 // 标志位:当前节点已完成10 if (!compareAndSetState(WORKING, FINISH)) {11 return workResult;12 }13}