前言 spring 有一个事件传播机制,可以较为方便地实现一个简单的广播模型
一般使用 ApplicationEventPublisher
(org.springframework.context.ApplicationEventPublisher) 来发布一个事件,然后使用 @EventListener
或者 @TransactionalEventListener
标记的方法来接收指定的事件
这样做有什么好处呢?一方面是解耦,另一方面提高可拓展性
@EventListener 使用 我一般会这样使用
事件定义 1 2 3 @Builder public record UserUpdateEvent (Long id) {}
事件发布者: xxxService 中 1 2 3 4 5 6 7 8 9 @Transactional(rollbackFor = Exception.class) public void updateUser () { publisher.publishEvent(UserUpdateEvent.builder() .id(userId) .build()); }
监听器: xxxListener 中 1 2 3 4 @EventListener(UserUpdateEvent.class) public void onUserUpdate (final UserUpdateEvent userUpdateEvent) { log.info("onUserUpdate listener: {}" , userUpdateEvent.id()); }
在这种的用法中,但发布者发布事件之后,就会触发该事件的所有监听器,并在同一个线程中,一个一个地执行监听器的动作,最后才返回到发布事件的位置继续往下执行
如果想要执行执行监听器而不影响主流程,可以配合 @Async 注解实现异步
@EventListener 不适用场景 试想一下这样一个场景
如果事件发布时处于处于一个数据库事务当中,同步监听这个事件的监听器发出了一个异步通知给第三方系统,第三方系统接收到这个通知之后需要回查状态.那么使用上面的方案可能会遇到什么问题
回查时事件发布者所在的事务还未提交,那么就可能查询到旧数据,导致数据不一致
想要解决这个问题,有哪些处理方案呢?
使用手动事务,在提交之后再发布事件
使用 @TransactionalEventListener 注解,使监听器在事务提交后再执行
本地消息表.适用于最终一致性场景
方案1 我在实践中遇到一个问题
一般而言, spring 的事务传播机制是 Propagation.REQUIRED
,通俗来讲就是有事务时加入,没事务时新建一个事务
在上面的例子中,如果 updateUser
的调用者已经处于一个事务之中,那么 updateUser
便会加入这个事务
也就是在这种场景下说这个事务的边界并非时我们在 方案1 预想的那样,等待 updateUser
执行完毕之后,事务就被提交了.那么监听器仍然是有可能在一个事务之中被执行
将事务传播机制换成 Propagation.REQUIRES_NEW
也能解决这个问题,但这个改动不一定符合实际使用场景是吧
方案2 我重点说说
把原来的监听器改成如下这样,和之前的区别就是方法上的注解,它从 @EventListener
变成了 @TransactionalEventListener
1 2 3 4 @TransactionalEventListener(value = UserUpdateEvent.class, phase = TransactionPhase.AFTER_COMMIT) public void onUserUpdate (final UserUpdateEvent userUpdateEvent) { log.info("onUserUpdate listener: {}" , userUpdateEvent.id()); }
来看看这个注解的定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @EventListener public @interface TransactionalEventListener { TransactionPhase phase () default TransactionPhase.AFTER_COMMIT; boolean fallbackExecution () default false ; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] value() default {}; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] classes() default {}; @AliasFor(annotation = EventListener.class, attribute = "condition") String condition () default "" ; @AliasFor(annotation = EventListener.class, attribute = "id") String id () default "" ; }
这是个复合注解,它扩展了 @EventListener
比较重要的属性是
phase: 定义监听器的执行时机,可以配置成 BEFORE_COMMIT
, AFTER_COMMIT
, AFTER_ROLLBACK
, AFTER_COMPLETION
四种.四个配置项基本都能见名知意,最后一个 AFTER_COMPLETION
的意思是无法事务执行成功与否都会执行监听器的动作
fallbackExecution: @TransactionalEventListener
比 @EventListener
更加特别的一个地方是,如果事件的发布者没有处于一个事务中,默认不会触发监听器的动作.但如果配置 fallbackExecution
为 true
,那么在无事务时发布事件,也可以触发监听器
value 和 classes: 这两互为别名,和 @EventListener
中是一样的,表示监听的事件类型
方案3 就不讲了, spring 的事件机制不支持持久化,也不支持分布式,使用本地消息表方案建议配合 mq 食用
小结 @EventListener
适合用在无事务,或者不关心监听器的执行与事件发布者处于同一个事务中的场景
@TransactionalEventListener
适合用在存在事务,且需要控制监听器在事务执行的某个阶段(BEFORE_COMMIT
, AFTER_COMMIT
, AFTER_ROLLBACK
, AFTER_COMPLETION
)执行的场景
问题解决,如果使用 @EventListener
和 @TransactionalEventListener
同时监听同一个事件,会怎么样呢
首先,需要明确一个前提,同时使用 @EventListener
和 @TransactionalEventListener
意味着发布事件的时机在事务中(@TransactionalEventListener
也可以用在非事务环境,但一般不会这样使用吧),那么可以确认了,@EventListener
的监听器会先于 @TransactionalEventListener
被执行
理由是 @EventListener
的监听器总是在发布事件之后就被触发,而此时事务仍然处于未提交状态
@EventListener 的原理 注册 ApplicationListener 在 spring 容器启动时,会将标记了 @EventListener
的方法包装成一个 ApplicationListener
对象存放到容器中
ApplicationListener
中包含了监听器被触发是需要执行的动作,也就是 onApplicationEvent
方法
在 ApplicationListenerMethodAdapter
(org.springframework.context.event.ApplicationListenerMethodAdapter) 中,它的 onApplicationEvent
方法如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void onApplicationEvent (ApplicationEvent event) { processEvent(event); } public void processEvent (ApplicationEvent event) { Object[] args = resolveArguments(event); if (shouldHandle(event, args)) { Object result = doInvoke(args); if (result != null ) { handleResult(result); } else { logger.trace("No result object given - no result to handle" ); } } }
再看一下事件的发布者做了什么 在 SimpleApplicationEventMulticaster
(org.springframework.context.event.SimpleApplicationEventMulticaster) 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void multicastEvent (ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event)); Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { if (executor != null && listener.supportsAsyncExecution()) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } }
根据事件类型取出所有的监听器
这里的源码我也没太看明白,但逻辑就是根据事件类型 取出监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 protected Collection<ApplicationListener<?>> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) { Object source = event.getSource(); Class<?> sourceType = (source != null ? source.getClass() : null ); ListenerCacheKey cacheKey = new ListenerCacheKey (eventType, sourceType); CachedListenerRetriever newRetriever = null ; CachedListenerRetriever existingRetriever = this .retrieverCache.get(cacheKey); if (existingRetriever == null ) { if (this .beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass(), this .beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this .beanClassLoader)))) { newRetriever = new CachedListenerRetriever (); existingRetriever = this .retrieverCache.putIfAbsent(cacheKey, newRetriever); if (existingRetriever != null ) { newRetriever = null ; } } } if (existingRetriever != null ) { Collection<ApplicationListener<?>> result = existingRetriever.getApplicationListeners(); if (result != null ) { return result; } } return retrieveApplicationListeners(eventType, sourceType, newRetriever); }
然后循环挨个触发监听器动作 一路追下去,可以在 invokeListener
-> doInvokeListener
-> listener.onApplicationEvent(event)
看到 onApplicationEvent
被触发了.完成了一个闭环
@TransactionalEventListener 的原理 上面说的是 @EventListener 的原理,那么 @TransactionalEventListener 会怎么样呢
它们区别在于注册监听器类型不一样
@EventListener
使用了 ApplicationListenerMethodAdapter
包装它的执行动作
@TransactionalEventListener
使用了 TransactionalApplicationListenerMethodAdapter
包装它的执行动作
1 2 3 public class TransactionalApplicationListenerMethodAdapter extends ApplicationListenerMethodAdapter implements TransactionalApplicationListener <ApplicationEvent> {}
此外, TransactionalApplicationListenerMethodAdapter
额外保存了 @TransactionalEventListener
注解特有的 transactionPhase
和 fallbackExecution
属性
TransactionalApplicationListenerMethodAdapter
还重写了 onApplicationEvent
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void onApplicationEvent (ApplicationEvent event) { if (TransactionalApplicationListenerSynchronization.register(event, this , this .callbacks)) { if (logger.isDebugEnabled()) { logger.debug("Registered transaction synchronization for " + event); } } else if (this .fallbackExecution) { if (getTransactionPhase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) { logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase" ); } processEvent(event); } else { if (logger.isDebugEnabled()) { logger.debug("No transaction is active - skipping " + event); } } }
最重要的就是 TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)
它依赖了 spring 事务,注册了一个事件,方便在事务的生命周期内执行注册的动作. register
方法会判断当前是否处于一个事务中,如果不处于事务中,则会返回 false
如果事务未开启,则会进入第二个 if 判断,即 fallbackExecution
设置为 true
时,即便没有事务也会执行
否则,什么都不做
小结 对比 @EventListener
和 @TransactionalEventListener
,会发现,其实这两在底层代码上的区别并不是很大.
@TransactionalEventListener
的主流程和 @EventListener
大差不差
spring 启动时注册 监听器 为 ApplicationListener
事件发布者发布事件,触发 onApplicationEvent 方法
反射调用 监听器方法
区别在于 @TransactionalEventListener
在触发 onApplicationEvent
时通过 TransactionalApplicationListenerSynchronization.register
将监听器事件注册给了事务管理器,之后的触发逻辑就由事务管理器来负责了
TransactionalApplicationListenerSynchronization 最后我们来看看 TransactionalApplicationListenerSynchronization
的 register
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static <E extends ApplicationEvent > boolean register ( E event, TransactionalApplicationListener<E> listener, List<TransactionalApplicationListener.SynchronizationCallback> callbacks) { if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { TransactionSynchronizationManager.registerSynchronization(new PlatformSynchronization <>(event, listener, callbacks)); return true ; } else if (event.getSource() instanceof TransactionContext txContext) { TransactionSynchronizationManager rtsm = new TransactionSynchronizationManager (txContext); if (rtsm.isSynchronizationActive() && rtsm.isActualTransactionActive()) { rtsm.registerSynchronization(new ReactiveSynchronization <>(event, listener, callbacks)); return true ; } } return false ; }
我的环境使用的是普通的事务,走第一个 if 判断.如果事务处于执行状态,则将监听器事件包装起来并注册到事务管理器中
1 2 3 4 5 6 7 8 9 public static void registerSynchronization (TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null" ); Set<TransactionSynchronization> synchs = synchronizations.get(); if (synchs == null ) { throw new IllegalStateException ("Transaction synchronization is not active" ); } synchs.add(synchronization); }
后面的触发实际就有 spring 事务管理了
再看看提交给事务管理器的 PlatformSynchronization
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private static class PlatformSynchronization <AE extends ApplicationEvent > extends TransactionalApplicationListenerSynchronization <AE> implements org .springframework.transaction.support.TransactionSynchronization { public PlatformSynchronization (AE event, TransactionalApplicationListener<AE> listener, List<TransactionalApplicationListener.SynchronizationCallback> callbacks) { super (event, listener, callbacks); } @Override public void beforeCommit (boolean readOnly) { if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) { processEventWithCallbacks(); } } @Override public void afterCompletion (int status) { TransactionPhase phase = getTransactionPhase(); if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { processEventWithCallbacks(); } else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { processEventWithCallbacks(); } else if (phase == TransactionPhase.AFTER_COMPLETION) { processEventWithCallbacks(); } } }
刚好处理了 @TransactionalEventListener
中 phase
的四个配置项
而它实现的接口 TransactionSynchronization
定义了事务的生命周期钩子方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public interface TransactionSynchronization extends Ordered , Flushable { int STATUS_COMMITTED = 0 ; int STATUS_ROLLED_BACK = 1 ; int STATUS_UNKNOWN = 2 ; @Override default int getOrder () { return Ordered.LOWEST_PRECEDENCE; } default void suspend () { } default void resume () { } @Override default void flush () { } default void beforeCommit (boolean readOnly) { } default void beforeCompletion () { } default void afterCommit () { } default void afterCompletion (int status) { } }
最后 本文简单阐述了 @EventListener
和 @TransactionalEventListener
的使用场景,以及区别,尝试结合源码来说明事件的发布流程