浅谈 @EventListener 和 @TransactionalEventListener

浅谈 @EventListener 和 @TransactionalEventListener

前言

spring 有一个事件传播机制,可以较为方便地实现一个简单的广播模型

一般使用 ApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher) 来发布一个事件,然后使用 @EventListener 或者 @TransactionalEventListener 标记的方法来接收指定的事件

这样做有什么好处呢?一方面是解耦,另一方面提高可拓展性

@EventListener 使用

我一般会这样使用

事件定义
1
2
3
@Builder
public record UserUpdateEvent(Long id) {
}
事件发布者: xxxService 中
1
2
3
4
5
6
7
8
9
@Transactional(rollbackFor = Exception.class)
public void updateUser() {
// 其他业务逻辑

// 发布一个 用户修改 事件
publisher.publishEvent(UserUpdateEvent.builder()
.id(userId)
.build());
}
监听器: xxxListener 中
1
2
3
4
@EventListener(UserUpdateEvent.class)
public void onUserUpdate(final UserUpdateEvent userUpdateEvent) {
log.info("onUserUpdate listener: {}", userUpdateEvent.id());
}

在这种的用法中,但发布者发布事件之后,就会触发该事件的所有监听器,并在同一个线程中,一个一个地执行监听器的动作,最后才返回到发布事件的位置继续往下执行

如果想要执行执行监听器而不影响主流程,可以配合 @Async 注解实现异步

@EventListener 不适用场景

试想一下这样一个场景

如果事件发布时处于处于一个数据库事务当中,同步监听这个事件的监听器发出了一个异步通知给第三方系统,第三方系统接收到这个通知之后需要回查状态.那么使用上面的方案可能会遇到什么问题

  • 第三方系统回查状态时,事件发布者所在的事务已经完成提交
  • 第三方系统回查状态时,事件发布者所在的事务还未提交

回查时事件发布者所在的事务还未提交,那么就可能查询到旧数据,导致数据不一致

回查状态不一致

想要解决这个问题,有哪些处理方案呢?

  1. 使用手动事务,在提交之后再发布事件
  2. 使用 @TransactionalEventListener 注解,使监听器在事务提交后再执行
  3. 本地消息表.适用于最终一致性场景

方案1 我在实践中遇到一个问题

一般而言, spring 的事务传播机制是 Propagation.REQUIRED,通俗来讲就是有事务时加入,没事务时新建一个事务

在上面的例子中,如果 updateUser 的调用者已经处于一个事务之中,那么 updateUser 便会加入这个事务

也就是在这种场景下说这个事务的边界并非时我们在 方案1 预想的那样,等待 updateUser 执行完毕之后,事务就被提交了.那么监听器仍然是有可能在一个事务之中被执行

将事务传播机制换成 Propagation.REQUIRES_NEW 也能解决这个问题,但这个改动不一定符合实际使用场景是吧

方案2 我重点说说

把原来的监听器改成如下这样,和之前的区别就是方法上的注解,它从 @EventListener 变成了 @TransactionalEventListener

1
2
3
4
@TransactionalEventListener(value = UserUpdateEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void onUserUpdate(final UserUpdateEvent userUpdateEvent) {
log.info("onUserUpdate listener: {}", userUpdateEvent.id());
}

来看看这个注解的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {

TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

boolean fallbackExecution() default false;

@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};

@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};

@AliasFor(annotation = EventListener.class, attribute = "condition")
String condition() default "";

@AliasFor(annotation = EventListener.class, attribute = "id")
String id() default "";

}

这是个复合注解,它扩展了 @EventListener

比较重要的属性是

  • phase: 定义监听器的执行时机,可以配置成 BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK, AFTER_COMPLETION 四种.四个配置项基本都能见名知意,最后一个 AFTER_COMPLETION 的意思是无法事务执行成功与否都会执行监听器的动作
  • fallbackExecution: @TransactionalEventListener@EventListener 更加特别的一个地方是,如果事件的发布者没有处于一个事务中,默认不会触发监听器的动作.但如果配置 fallbackExecutiontrue,那么在无事务时发布事件,也可以触发监听器
  • value 和 classes: 这两互为别名,和 @EventListener 中是一样的,表示监听的事件类型

方案3 就不讲了, spring 的事件机制不支持持久化,也不支持分布式,使用本地消息表方案建议配合 mq 食用

小结

@EventListener 适合用在无事务,或者不关心监听器的执行与事件发布者处于同一个事务中的场景

@TransactionalEventListener 适合用在存在事务,且需要控制监听器在事务执行的某个阶段(BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK, AFTER_COMPLETION )执行的场景

问题解决,如果使用 @EventListener@TransactionalEventListener 同时监听同一个事件,会怎么样呢

首先,需要明确一个前提,同时使用 @EventListener@TransactionalEventListener 意味着发布事件的时机在事务中(@TransactionalEventListener 也可以用在非事务环境,但一般不会这样使用吧),那么可以确认了,@EventListener 的监听器会先于 @TransactionalEventListener 被执行

理由是 @EventListener 的监听器总是在发布事件之后就被触发,而此时事务仍然处于未提交状态

@EventListener 的原理

注册 ApplicationListener

在 spring 容器启动时,会将标记了 @EventListener 的方法包装成一个 ApplicationListener 对象存放到容器中

ApplicationListener 中包含了监听器被触发是需要执行的动作,也就是 onApplicationEvent 方法

ApplicationListenerMethodAdapter(org.springframework.context.event.ApplicationListenerMethodAdapter) 中,它的 onApplicationEvent 方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void onApplicationEvent(ApplicationEvent event) {
// 触发监听器动作
processEvent(event);
}

public void processEvent(ApplicationEvent event) {
// 解析参数
Object[] args = resolveArguments(event);
// 根据 condition 属性判断是否需要触发
if (shouldHandle(event, args)) {
// 反射执行监听器动作
Object result = doInvoke(args);
if (result != null) {
handleResult(result);
}
else {
logger.trace("No result object given - no result to handle");
}
}
}

再看一下事件的发布者做了什么

SimpleApplicationEventMulticaster(org.springframework.context.event.SimpleApplicationEventMulticaster) 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null && listener.supportsAsyncExecution()) {
// 异步执行
executor.execute(() -> invokeListener(listener, event));
}
else {
// 同步执行
invokeListener(listener, event);
}
}
}

根据事件类型取出所有的监听器

这里的源码我也没太看明白,但逻辑就是根据事件类型取出监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {

Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

// Potential new retriever to populate
CachedListenerRetriever newRetriever = null;

// Quick check for existing entry on ConcurrentHashMap
CachedListenerRetriever existingRetriever = this.retrieverCache.get(cacheKey);
if (existingRetriever == null) {
// Caching a new ListenerRetriever if possible
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
newRetriever = new CachedListenerRetriever();
existingRetriever = this.retrieverCache.putIfAbsent(cacheKey, newRetriever);
if (existingRetriever != null) {
newRetriever = null; // no need to populate it in retrieveApplicationListeners
}
}
}

if (existingRetriever != null) {
Collection<ApplicationListener<?>> result = existingRetriever.getApplicationListeners();
if (result != null) {
return result;
}
// If result is null, the existing retriever is not fully populated yet by another thread.
// Proceed like caching wasn't possible for this current local attempt.
}

return retrieveApplicationListeners(eventType, sourceType, newRetriever);
}

然后循环挨个触发监听器动作

一路追下去,可以在 invokeListener -> doInvokeListener -> listener.onApplicationEvent(event) 看到 onApplicationEvent 被触发了.完成了一个闭环

@TransactionalEventListener 的原理

上面说的是 @EventListener 的原理,那么 @TransactionalEventListener 会怎么样呢

它们区别在于注册监听器类型不一样

@EventListener 使用了 ApplicationListenerMethodAdapter 包装它的执行动作

@TransactionalEventListener 使用了 TransactionalApplicationListenerMethodAdapter 包装它的执行动作

1
2
3
// TransactionalApplicationListenerMethodAdapter 继承了 ApplicationListenerMethodAdapter
public class TransactionalApplicationListenerMethodAdapter extends ApplicationListenerMethodAdapter
implements TransactionalApplicationListener<ApplicationEvent> {}

此外, TransactionalApplicationListenerMethodAdapter 额外保存了 @TransactionalEventListener 注解特有的 transactionPhasefallbackExecution 属性

TransactionalApplicationListenerMethodAdapter 还重写了 onApplicationEvent 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)) {
if (logger.isDebugEnabled()) {
logger.debug("Registered transaction synchronization for " + event);
}
}
else if (this.fallbackExecution) {
if (getTransactionPhase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}

最重要的就是 TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)

它依赖了 spring 事务,注册了一个事件,方便在事务的生命周期内执行注册的动作. register 方法会判断当前是否处于一个事务中,如果不处于事务中,则会返回 false

如果事务未开启,则会进入第二个 if 判断,即 fallbackExecution 设置为 true 时,即便没有事务也会执行

否则,什么都不做

小结

对比 @EventListener@TransactionalEventListener,会发现,其实这两在底层代码上的区别并不是很大.

@TransactionalEventListener 的主流程和 @EventListener 大差不差

  1. spring 启动时注册 监听器 为 ApplicationListener

  2. 事件发布者发布事件,触发 onApplicationEvent 方法

  3. 反射调用 监听器方法

区别在于 @TransactionalEventListener 在触发 onApplicationEvent 时通过 TransactionalApplicationListenerSynchronization.register 将监听器事件注册给了事务管理器,之后的触发逻辑就由事务管理器来负责了

TransactionalApplicationListenerSynchronization

最后我们来看看 TransactionalApplicationListenerSynchronizationregister 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static <E extends ApplicationEvent> boolean register(
E event, TransactionalApplicationListener<E> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
// 事务处于执行状态
if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new PlatformSynchronization<>(event, listener, callbacks));
return true;
}
// 和响应式事务有关
else if (event.getSource() instanceof TransactionContext txContext) {
TransactionSynchronizationManager rtsm = new TransactionSynchronizationManager(txContext);
if (rtsm.isSynchronizationActive() && rtsm.isActualTransactionActive()) {
rtsm.registerSynchronization(new ReactiveSynchronization<>(event, listener, callbacks));
return true;
}
}
return false;
}

我的环境使用的是普通的事务,走第一个 if 判断.如果事务处于执行状态,则将监听器事件包装起来并注册到事务管理器中

1
2
3
4
5
6
7
8
9
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}

后面的触发实际就有 spring 事务管理了

再看看提交给事务管理器的 PlatformSynchronization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static class PlatformSynchronization<AE extends ApplicationEvent>
extends TransactionalApplicationListenerSynchronization<AE>
implements org.springframework.transaction.support.TransactionSynchronization {

public PlatformSynchronization(AE event, TransactionalApplicationListener<AE> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {

super(event, listener, callbacks);
}

@Override
public void beforeCommit(boolean readOnly) {
// 事务提交前
if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}

@Override
public void afterCompletion(int status) {
TransactionPhase phase = getTransactionPhase();
// 事务提交后
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
// 事务回滚
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
// 无论如何都执行
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
}

刚好处理了 @TransactionalEventListenerphase 的四个配置项

而它实现的接口 TransactionSynchronization 定义了事务的生命周期钩子方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface TransactionSynchronization extends Ordered, Flushable {

int STATUS_COMMITTED = 0;

int STATUS_ROLLED_BACK = 1;

int STATUS_UNKNOWN = 2;

@Override
default int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

default void suspend() {
}

default void resume() {
}

@Override
default void flush() {
}

default void beforeCommit(boolean readOnly) {
}

default void beforeCompletion() {
}

default void afterCommit() {
}

default void afterCompletion(int status) {
}

}

最后

本文简单阐述了 @EventListener@TransactionalEventListener 的使用场景,以及区别,尝试结合源码来说明事件的发布流程

浅谈 @EventListener 和 @TransactionalEventListener

https://wuhunyu.top/chat/2025/08/event-listener/index.html

作者

wuhunyu

发布于

2025-08-20

更新于

2025-09-11

许可协议