Guava EvenBus源码解读发布订阅模型Guava EventBus概述事件注册registerfindAllSubscribersgetAnnotatedMethodsNotCachedSubscriber事件分发postgetSubscribersflattenHierarchyCache事件执行perThreadDispatchLegacyAsyncDispatcher订阅者执行顺序线程执行线程池并发模型优缺点Spring Event对比工作流程过滤特性顺序特性参考
通过发布订阅中心建立事件发布者和事件订阅者间的通信体系。发布者和订阅者不直接通信,而是将要发布的消息交 由发布订阅中心管理,订阅者按需订阅中心中的消息,订阅者将在事件发生时被事件处理中心主动唤醒。
发布者的发布动作和订阅者的订阅动作相互独立,消息派发由发布订阅中心负责,生产者、订阅者完全解耦的。

EventBus为轻量级、进程内事件驱动组件,提供同步以及异步方式的事件驱动机制。

组成:EventBus 、AsyncEventBus提供同步和异步的事件处理机制;Event、DeadEvent事件和无订阅者的事件;Subscriber、SynchronizedSubscriber根据handler方法生成的线程不安全订阅者和线程安全订阅者;perThreadDispatcher, legacyAsyncDispatcher线程独立派发器和线程共享派发器,用于派发任务给Subscriber;SubscriberRegistry订阅者注册器,用户根据Listener生成Subscriber并注入到容器中。
工作流程:定义事件订阅者Listener,包含事件发生时要执行的操作handler。将Listener注册到EventBus时,将每个handler封装成Subscriber对象,加入所等待事件的集合Subscribers中。当某事件发生时,等待在该事件集合Subscribers中的Subscriber对象将被调用,完成事件的处理。


register注册方法register:构建缓存<key=event.class, value=[Subscriber(handler)]>,将listener及其父类定义的handler封装成Subscriber对象,加入所等待事件的集合Subscribers中。
x1void register(Object listener) {2 // 将listener及其父类定义的全部handler方法包换成Subscriber类型,3 // 并以Map形式返回,key为等待的事件类型,value为等待该事件的多个hadler构成的Subscriber set集合4 // <key=envnt.class, value=[Subscriber(handler)]>5 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);6
7 for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {8 Class<?> eventType = entry.getKey();9 Collection<Subscriber> eventMethodsInListener = entry.getValue();10
11 // 将新加入的Subscriber添加到所等待事件对应的Subscribers集合中12 // CopyOnWrite集合通过读写分离,写操作同步保证并发写入安全13 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);14 // 该事件未被注册过,创建新的Subscribers集合15 if (eventSubscribers == null) {16 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();17 // putIfAbsent: 不存在才添加; firstNonNull:获得参数列表中第一个非null值18 // 双重检查防止两个线程同时发现event未注册,一个线程完成注册后,另一个线程用新的set覆盖,导致第一个线程注册Subscriber丢失19 eventSubscribers =20 MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);21 }22
23 eventSubscribers.addAll(eventMethodsInListener);24 }25}
findAllSubscribers获取注册listener及其父类定义的所有handler方法,并以<key=event.class, value=[Subscriber(handler)]>形式返回。
xxxxxxxxxx111private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {2 Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();3 Class<?> clazz = listener.getClass();4 // 将listener及其父类的全部handler方法构建<envent.class,[Subscriber(handler)]>5 for (Method method : getAnnotatedMethods(clazz)) {6 Class<?>[] parameterTypes = method.getParameterTypes();7 Class<?> eventType = parameterTypes[0];8 methodsInListener.put(eventType, Subscriber.create(bus, listener, method));9 }10 return methodsInListener;11}
getAnnotatedMethodsNotCachedgetAnnotatedMethods方法将从<listener.class,[Method]>缓存subscriberMethodsCache获取当前listener对象及其父类所有标注有@Subscribe的handler方法。
xxxxxxxxxx181private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {2 try {3 return subscriberMethodsCache.getUnchecked(clazz);4 } catch (UncheckedExecutionException e) {5 throwIfUnchecked(e.getCause());6 throw e;7 }8}9private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =10 CacheBuilder.newBuilder()11 .weakKeys()12 .build(13 new CacheLoader<Class<?>, ImmutableList<Method>>() {14 15 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {16 return getAnnotatedMethodsNotCached(concreteClass);17 }18 });
subscriberMethodsCache声明为static变量,作为缓存只会在类加载时被加载一次,在类加载时会触发初始化方法getAnnotatedMethodsNotCached
xxxxxxxxxx211private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {2 // 获得listener自身的class、以及所有父类黑和接口3 // 返回值中子类在父类之前,只有子类的hanlder方法被注册,保障事件触发时,调用的是子类方法,保证多态特性4 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();5 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();6 for (Class<?> supertype : supertypes) {7 // 自身方法,不包含继承方法8 for (Method method : supertype.getDeclaredMethods()) {9 // 查找有`@Subscribe`标注的方法,并加入<method>10 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {11 MethodIdentifier ident = new MethodIdentifier(method);12 // 保证被重写方法只加载一次,MethodIdentifier使用(methodName,parmList)作为唯一标识13 // 重写方法的MethodIdentifier对象相等,保证只有子类被重写方法被注册,维持多态特性14 if (!identifiers.containsKey(ident)) {15 identifiers.put(ident, method);16 }17 }18 }19 }20 return ImmutableList.copyOf(identifiers.values());21}
SubscriberSubscriber创建时将根据handler方法是否有@AllowConcurrentEvents,创建为普通Subscriber或者线程安全的SynchronizedSubscriber,二者为继承关系,唯一区别是SynchronizedSubscriber执行时会加锁,保障线程安全。二者在线程池中通过反射方式执行持有的listener的handler方法。
xxxxxxxxxx391class Subscriber {2 private final Executor executor;3 4 private Subscriber(EventBus bus, Object target, Method method) {5 this.bus = bus;6 this.target = checkNotNull(target);7 this.method = method;8 method.setAccessible(true);9 this.executor = bus.executor();10 }11 12 final void dispatchEvent(Object event) {13 // 在线程池中通过反射方式执行持有的`listener`的`handler`方法。14 executor.execute(15 () -> {16 try {17 invokeSubscriberMethod(event);18 } catch (InvocationTargetException e) {19 bus.handleSubscriberException(e.getCause(), context(event));20 }21 });22 }23 void invokeSubscriberMethod(Object event) throws InvocationTargetException {24 try {25 method.invoke(target, checkNotNull(event));26 } 27 // catch exception28 }29
30 static final class SynchronizedSubscriber extends Subscriber {31 32 void invokeSubscriberMethod(Object event) throws InvocationTargetException {33 // 加锁保证线程安全34 synchronized (this) {35 super.invokeSubscriberMethod(event);36 }37 }38 }39}

post当向EventBus投递事件时,将找到<key=event.class, value=[Subscriber(handler)]>缓存中,用于处理对应事件及父类事件的Subscriber,进行事件处理。
xxxxxxxxxx121public void post(Object event) {2 // 对应事件及父类事件的`Subscriber`3 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);4 if (eventSubscribers.hasNext()) {5 // 分发任务,完成对应Subscriber调用6 dispatcher.dispatch(event, eventSubscribers);7 } else if (!(event instanceof DeadEvent)) {8 // 无对应Subscriber的事件已DeadEvent重新发布9 // 可以为DeadEvent创建Subscriber兜底处理10 post(new DeadEvent(this, event));11 }12}getSubscribersgetSubscribers的获取不仅仅是对应事件的所有Subscriber,还包括当前事件所有父类的Subscriber,保证事件层级关系能正确处理。
xxxxxxxxxx171Iterator<Subscriber> getSubscribers(Object event) {2 // flattenHierarchy方法将返回当前事件类型及其父类类型,且当前类型在前,父类在后3 ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());4
5 List<Iterator<Subscriber>> subscriberIterators =6 Lists.newArrayListWithCapacity(eventTypes.size());7 // 获得当前事件和父类事件的全部订阅者8 for (Class<?> eventType : eventTypes) {9 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);10 if (eventSubscribers != null) {11 // eager no-copy snapshot12 subscriberIterators.add(eventSubscribers.iterator());13 }14 }15
16 return Iterators.concat(subscriberIterators.iterator());17}flattenHierarchyCacheflattenHierarchy方法会到<key=event.class, value=[event.class, superClass.class]>缓存flattenHierarchyCache中找到当前类及父类类型,因为是静态变量,所以在代码加载的时候就会缓存Event所有父类。
xxxxxxxxxx221static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) {2 try {3 return flattenHierarchyCache.getUnchecked(concreteClass);4 } catch (UncheckedExecutionException e) {5 throw Throwables.propagate(e.getCause());6 }7} 8private static final LoadingCache<Class<?>, ImmutableSet<Class<?>>> flattenHierarchyCache =9 CacheBuilder.newBuilder()10 .weakKeys()11 .build(12 new CacheLoader<Class<?>, ImmutableSet<Class<?>>>() {13 // <Class<?>> is actually needed to compile14 ("RedundantTypeArguments")15 16 public ImmutableSet<Class<?>> load(Class<?> concreteClass) {17 return ImmutableSet.<Class<?>>copyOf(18 // 获得concreteClass及其所有父类,子类在前,父类在后19 TypeToken.of(concreteClass).getTypes().rawTypes());20 }21 });22
SubscriberRegistry().post方法的事件分发器dispatch常见的类型可以分为两类:PerThreadQueuedDispatcher, LegacyAsyncDispatcher
perThreadDispatchEventBus默认分发器,使用DirectExecutor线程池,由提交任务的线程去执行任务,属于同步模型,线程间独立分发,线程内BFS有序处理。
线程间:为每一个调用SubscriberRegistry().post()方法的线程使用一个线程独立的队列queueForThread,缓存各自待分发的Subscriber,线程独立分发调度事件,防止多线程竞争以及容器的并发读写。
线程内:使用线程独立dispatching变量标记当前线程是否已经开始分发任务,BFS方式处理事件,只会在最顶层一处处理handler调用。同时线程内BFS方式嵌套事件处理,严格的FIFO保证线程内事件执行顺序和发布顺序一致,以及同一事件的多个Subscriber的处理顺序和注册顺序一致。
处理流程:
-->当前线程分发事件A,A的handler加入队列,设置dispatching=true,进入A的handler处理,
--> 使用DirectExecutor线程池,将由当前线程执行handler,假设handler中又分发事件B,
--> 线程内的dispatch()再次被调用于B事件分发,此时dispatching=true,事件B的handler将只是加入队列,不会继续handler处理,
--> dispatch()方法返回,继续在最顶层处理队列中handler调用,
--> 防止DFS方式不断调度运行handler,造成SOF,
--> BFS方式嵌套事件处理,处理线程在处理完事件A的所有handler后再处理B事件,以及同一事件的多个Subscriber的处理顺序和注册顺序一致。
xxxxxxxxxx441/** 存储当前线程待分发的事件,有界队列,最大值为int的最大值 */2private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() {3 4 protected Queue<Event> initialValue() {5 return Queues.newArrayDeque();6 }7};8/** 标志位标记当前线程是否已经开始分发任务,BFS处理事件,防止不断递归handler造成SOF */9private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() {10 11 protected Boolean initialValue() {12 return false;13 }14};15// 分发事件,执行hander处理逻辑16void dispatch(Object event, Iterator<Subscriber> subscribers) {17 checkNotNull(event);18 checkNotNull(subscribers);19 // 当Dispatcher被多线程调用时,如果不设置线程独立队列,将导致队列的并发读写竞争和读写错误20 // 独立线程发布事件,其调度独立,不用等待其他线程的事件处理完成21 Queue<Event> queueForThread = requireNonNull(queue.get());22 queueForThread.offer(new Event(event, subscribers));23
24 // dispatching用于表示当前线程已经开始分发任务,防止递归分发导致SOF25 // 如果事件A的handler导致了事件B的发布,B在dispatch()中在将B对应Subscriber加入队列后即返回,继续完成已有队列中handle调度26 // BFS方式嵌套事件处理:事件A的handler导致事件B被发布,处理线程在处理完事件A的所有handler后再处理B事件。27 if (!dispatching.get()) {28 dispatching.set(true);29 try {30 Event nextEvent;31 // 初始队列只有一个元素,但是Subscriber的处理可能导致新的事件被分发,队列元素增加,需要循环处理32 while ((nextEvent = queueForThread.poll()) != null) {33 while (nextEvent.subscribers.hasNext()) {34 // 分发当前事件对应的全部Subscriber35 nextEvent.subscribers.next().dispatchEvent(nextEvent.event);36 }37 }38 } finally {39 // 当前线程的全部任务分发完成,清空缓存队列40 dispatching.remove();41 queue.remove();42 }43 }44}
LegacyAsyncDispatcherAsyncEventBus默认分发器,强制自定义线程池,一般是异步执行任务(如果使用DirectExecutor线程池则是同步),多线程共享等待队列,DFS处理,事件间和事件内无执行顺序保证。
线程间:多个线程通过同一个Dispatcher分发的事件将位于同一个队列,但是ConcurrentLinkedQueue在高并发环境下,事件整体的出队顺序可能与它们发布顺序不同(非阻塞算法、并发修改、内存一致性效应、线程调度和执行延迟),事件处理顺序和发布顺序不一定一致。
线程内:异步处理事件订阅,采用DFS方式处理队列事件,如果事件A某个的handler触发了事件B的发布,线程先处理事件B所有的handler后,再返回继续处理事件A的其它handler。同样因为ConcurrentLinkedQueue在高并发环境下,出队顺序可能与它们加入队列的顺序不同,同一个事件的多个handler的执行顺序不一定和注册顺序一致。
xxxxxxxxxx191private static final class LegacyAsyncDispatcher extends Dispatcher {2
3 /** 并发安全的队列实例,用于存储多个线程post的事件和对应的订阅者,非严格FIFO */4 private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue();5
6 7 void dispatch(Object event, Iterator<Subscriber> subscribers) {8 checkNotNull(event);9 // DFS方式处理handler10 // ConcurrentLinkedQueue在高并发环境下,出队顺序可能与入队列的顺序不同,非严格FIFO11 while (subscribers.hasNext()) {12 queue.add(new EventWithSubscriber(event, subscribers.next()));13 }14
15 EventWithSubscriber e;16 while ((e = queue.poll()) != null) {17 e.subscriber.dispatchEvent(e.event);18 }19}
EventBus:同一个事件的多个订阅者,谁先注册到EventBus谁先执行。如果是在同一个Listener中的多个订阅者一起被注册,收到事件的顺序跟方法名有关。
AsyncEventBus同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,在新的线程中,异步并发的执行自己的任务。
在根据方法创建订阅者时,根据方法是否标注为@AllowConcurrentEvents,创建非线程安全的Subscriber或者线程安全的SynchronizedSubscriber。
无论是Subscriber还是SynchronizedSubscriber他们的dispatchEvent()都是将任务提交到线程池执行,通过反射执行方法,而他们的线程池参数来自EventBus和AsyncEventBus。
EventBus的Executor参数默认是DirectExecutor,该线程池的作用是让提交任务的线程去执行任务,同步执行,Dispatcher参数默认为PerThreadQueuedDispatcher,线程间事件分发独立。
AsyncEventBus的Executor强制用户传入,由用户决定线程池类型,一般是异步执行,任务提交线程池后就返回,且Dispatcher参数必须为LegacyAsyncDispatcher类型,线程间共享等待队列。
xxxxxxxxxx491public class EventBus {2 // 默认是`DirectExecutor`3 private final Executor executor;4 // 默认是`PerThreadQueuedDispatcher`5 private final Dispatcher dispatcher;6}7
8public class AsyncEventBus extends EventBus {9 // `Executor`强制用户传入,`Dispatcher`参数默认为`LegacyAsyncDispatcher`类型。10 public AsyncEventBus(Executor executor) {11 super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);12 }13}14
15class Subscriber {16 // EventBus.executor17 private final Executor executor;18 private final Method method;19 final Object target;20 21 final void dispatchEvent(Object event) {22 // 将任务提交到线程池执行23 executor.execute(24 () -> {25 try {26 invokeSubscriberMethod(event);27 } catch (InvocationTargetException e) {28 bus.handleSubscriberException(e.getCause(), context(event));29 }30 });31 }32 void invokeSubscriberMethod(Object event) throws InvocationTargetException {33 //通过反射执行handler方法中的逻辑34 try {35 method.invoke(target, checkNotNull(event));36 } 37 // catch exception38 }39
40 static final class SynchronizedSubscriber extends Subscriber {41 42 void invokeSubscriberMethod(Object event) throws InvocationTargetException {43 // // 加锁保证线程安全44 synchronized (this) {45 super.invokeSubscriberMethod(event);46 }47 }48 }49}整体的并发模型由:EventBus 、AsyncEventBus;Executor;Subscriber、SynchronizedSubscriber;PerThreadQueuedDispatcher、LegacyAsyncDispatcher共同决定。
事件总线:EventBus默认使用DirectExecutor,Subscriber将在发布事件的同一个线程中被调用,同步执行。
AsyncEventBus使用自定义线程池来处理事件,允许事件处理异步进行。
线程模型:DirectExecutor为单线程处理,即使使用 AsyncEventBus,也不能实现并行处理。使用多线程线程池时,EventBus和AsyncEventBus 可以利用多个线程进行事件处理。
订阅者类型:根据方法是否标注为@AllowConcurrentEvents,创建Subscriber或者SynchronizedSubscriber。二者的区别在于SynchronizedSubscriber通过加锁保证handler并发安全。
事件分发方式:PerThreadQueuedDispatcher线程间事件分发独立,BFS方式执行;LegacyAsyncDispatcher线程间共享事件分发等待队列,DFS方式执行
EventBus强制绑定DirectExecutor和PerThreadQueuedDispatcher;AsyncEventBus强制绑定LegacyAsyncDispatcher。上述三个因素各自独立,事件总线、线程模型、订阅者类型三者共同影响并发模型。
简单易上手。
局限于进程内使用,无法实现跨进程处理。
基于内存,且没有任何持久化机制。
不支持设置同一消息的订阅者消费顺序,默认按照注册顺序执行。
不支持消息过滤。
--> 定义事件ApplicationEvent及对应的ApplicationListener;
--> 当事件发生时,通过ApplicationEventPublisher接口实现类的publistEvent方法发布事件;
--> publistEvent方法通过ApplicationEventMulticaster接口实现类的multicastEvent方法广播事件;
-->multicastEvent方法找到事件的全部Listener,并通过invokeListener执行他们定义的事件处理逻辑;
-->invokeListener方法将通过ApplicationListenerMethodAdapter的onApplicationEvent执行定义的事件处理逻辑,这里的ApplicationListenerMethodAdapter是handler方法的适配器,负责统一handler方法的调用;
-->最后onApplicationEvent将通过processEvent方法使用反射调用,真正执行handler逻辑。
Spring Event支持在handler定义时指定过滤条件,当条件为真时才会执行具体的handler逻辑。
xxxxxxxxxx191234public class CustomerEvent {5 private String name;6}7
8("springListener")9public class SpringListener {10 11 /**12 * 监听 CustomerEvent 类型事件,但是需要满足condition条件13 */14 (condition = "#event.getName().equals('xxx')")15 public void processEvent(CustomerEvent event) {16 System.out.println("process CustomerEvent, name:" + event.getName());17 }18}19
底层实现上使用ApplicationListenerMethodAdapter作为handler方法的适配器,当事件被触发时,先判断条件是否满足,之后再执行方法。
xxxxxxxxxx181public class ApplicationListenerMethodAdapter {2 private final Method method; // processEvent3 4 private final String condition; // #event.getName().equals('xxx')5 6 public void processEvent(ApplicationEvent event) {7 Object[] args = resolveArguments(event);8 // condition是否成立判断9 if (shouldHandle(event, args)) {10 Object result = doInvoke(args);11 if (result != null) {12 handleResult(result);13 }14 else {15 logger.trace("No result object given - no result to handle");16 }17 }18}
Spring Event支持在某事件发生时,为他的全部handler指定执行顺序,当时间触发时多个handler将按照指定顺序执行。
xxxxxxxxxx1512public class EventListenerBean {3 4 (1)5 public void handleFirst(CustomEvent event) {6 System.out.println("first");7 }8
9 10 (2)11 public void handleSecond(CustomEvent event) {12 System.out.println("second");13 }14}15
其底层实现是在事件发生时,在ApplicationEventMulticaster实现类的multicastEvent方法中找到事件对应的全部Listener后,对他们按照指定的顺序排序,再执行invokeListener调用
xxxxxxxxxx371public class SimpleApplicationEventMulticaster{2 // 广播事件,获取该事件的全部Listener并逐个调用3 public void multicastEvent(ApplicationEvent event, ResolvableType eventType) {4 // do something5 for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {6 if (executor != null && listener.supportsAsyncExecution()) {7 try {8 executor.execute(() -> invokeListener(listener, event));9 }10 catch (RejectedExecutionException ex) {11 // Probably on shutdown -> invoke listener locally instead12 invokeListener(listener, event);13 }14 }15 else {16 invokeListener(listener, event);17 }18 }19}20 21public abstract class AbstractApplicationEventMulticaster{22 // 获得事件的全部Listener23 protected Collection<ApplicationListener<?>> getApplicationListeners(24 ApplicationEvent event, ResolvableType eventType) {25 // do something26 return retrieveApplicationListeners(eventType, sourceType, newRetriever);27 }28 29 // 从注册的所有监听器中找出那些对给定事件感兴趣的监听器30 private Collection<ApplicationListener<?>> retrieveApplicationListeners(31 ResolvableType eventType, Class<?> sourceType, CachedListenerRetriever retriever) {32 // do something33 // 按照@Order指定顺序排序34 AnnotationAwareOrderComparator.sort(allListeners);35 return allListeners;36}37