1. 背景
最近读了Spring声明式事务相关源码,现在将相关原理及本人注释过的实现源码整理到博客上并对一些工作中的案例与事务源码中的参数进行总结。
2. 基本概念
2.1 基本名词解释
| 名词 | 概念 | | ————- |:————-| |PlatformTransactionManager|事务管理器,管理事务的各生命周期方法,下文简称TxMgr| |TransactionAttribute | 事务属性, 包含隔离级别,传播行为,是否只读等信息,下文简称TxAttr | |TransactionStatus | 事务状态,下文简称TxStatus| |TransactionInfo | 事务信息,内含TxMgr, TxAttr, TxStatus等信息,下文简称TxInfo| |TransactionSynchronization|事务同步回调,内含多个钩子方法,下文简称TxSync / transaction synchronization| |TransactionSynchronizationManager|事务同步管理器,维护当前线程事务资源,信息以及TxSync集合|
2.2 七种事务传播行为
Spring中的Propagation枚举和TransactionDefinition接口定义了7种事务传播行为:
- REQUIRED 如果当前无事务则开启一个事务,否则加入当前事务。
- SUPPORTS 如果当前有事务则加入当前事务。
- MANDATORY 如果当前无事务则抛出异常,否则加入当前事务。
- REQUIRES_NEW 如果当前无事务则开启一个事务,否则挂起当前事务并开启新事务。
- NOT_SUPPORTED 如果当前有事务,则挂起当前事务以无事务状态执行方法。
- NEVER 如果当前有事务,则抛出异常。
- NESTED 创建一个嵌套事务,如果当前无事务则创建一个事务。
2.3 套路讲解
这里介绍以下Spring声明式事务的套路。如果能知晓大致套路会对接下来看源码有很大的帮助。文笔不是太好,下面的描述如有不清,还请指出。
2.3.1 事务拦截器
我们给一个bean的方法加上@Transactional注解后,Spring容器给我们的是一个代理的bean。当我们对事务方法调用时,会进入Spring的ReflectiveMethodInvocation#proceed方法。这是AOP的主要实现,在进入业务方法前会调用各种方法拦截器,我们需要关注的拦截器是org.springframework.transaction.interceptor.TransactionInterceptor。 TransactionInterceptor的职责类似于一个“环绕切面”,在业务方法调用前根据情况开启事务,在业务方法调用完回到拦截器后进行善后清理。
事务切面在源码中具体的实现方法是TransactionAspectSupport#invokeWithinTransaction,相信平时大家debug的时候在调用栈中经常能看到此方法。事务切面关注的是TransactionInfo(TxInfo),TxInfo是一个“非常大局观”的东西(里面啥都有:TxMgr, TxAttr, TxStatus还有前一次进入事务切面的TransactionInfo)。
因此事务切面会调用createTransactionIfNecessary方法来创建事务并拿到一个TxInfo(无论是否真的物理创建了一个事务)。如果事务块内的代码发生了异常,则会根据TxInfo里面的TxAttr配置的rollback规则看看这个异常是不是需要回滚,不需要回滚就尝试提交,否则就尝试回滚。如果未发生异常,则尝试提交。
2.3.2 提交与回滚
事务切面对于尝试提交会判断是否到了最外层事务(某个事务边界)。举个例子:有四个事务方法依次调用,传播行为分别是 方法1:REQUIRED, 方法2:REQUIRED, 方法3: REQUIRES_NEW, 方法4: REQUIRED。很显然这其中包含了两个独立的物理事务,当退栈到方法4的事务切面时,会发现没有到事务最外层,所以不会有真正的物理提交。而在退栈到了方法3对应的事务切面时会发现是外层事务,此时会发生物理提交。同理,退栈到方法1的事务切面时也会触发物理提交。
那么问题来了,Spring是怎么判断这所谓“最外层事务”的呢。 答案是TxStatus中有个属性叫newTransaction用于标记是否是新建事务(根据事务传播行为得出,比如加入已有事务则会是false),以及一个名为transaction的Object用于表示物理事务对象(由具体TxMgr子类负责给出)。Spring会根据每一层事务切面创建的TxStatus内部是否持有transaction对象以及newTransaction标志位判断是否属于外层事务。
类似的,Spring对于回滚事务也是会在最外层事务方法对应的切面中进行物理回滚。而在非最外层事务的时候会由具体txMgr子类给对应的事务打个的标记用于标识这个事务该回滚,这样的话在所有同一物理事务方法退栈过程中在事务切面中都能读取到事务被打了应该回滚的标记。可以说这是同一物理事务方法之间进行通信的机制。
2.3.3 ThreadLocal的使用
Spring事务代码中用ThreadLocal来进行资源与事务的生命周期的同步管理。
在事务切面层面,TransactionAspectSupport里面有个transactionInfoHolder的ThreadLocal对象,用于把TxInfo绑定到线程。那么这样在我们的业务代码或者其他切面中,我们可以拿到TxInfo,也能拿到TxStatus。拿到TxStatus我们就可以调用setRollbackOnly来打标以手动控制事务必须回滚。
TransactionSynchronizationManager是Spring事务代码中对ThreadLocal使用最多的类,目前它内部含有6个ThreadLocal,分别是:
- resources 类型为
Map<Object, Object>用于保存事务相关资源,比如我们常用的DataSourceTransactionManager会在开启物理事务的时候把<DataSource, ConnectionHolder>绑定到线程。 这样在事务作用的业务代码中可以通过Spring的DataSourceUtils拿到绑定到线程的ConnectionHolder中的Connection。事实上对于MyBatis来说与Spring集成时就是这样拿的。 - synchronizations 类型为
Set<TransactionSynchronization>用于保存transaction synchronization,这个可以理解为是回调钩子对象,内部含有beforeCommit, afterCommit, beforeCompletion等钩子方法。 我们自己如果需要的话也可以在业务方法或者切面中注册一些transaction synchronization对象用于追踪事务生命周期做一些自定义的事情。 - currentTransactionName 当前事务名
- currentTransactionReadOnly 当前事务是否只读
- currentTransactionIsolationLevel 当前事务隔离级别
- actualTransactionActive 是否存在物理事务,比如传播行为为NOT_SUPPORTED时就会是false。
2.4 几幅草图
下面是我做的几张图,比较丑陋。举了三个不同的事务传播情况,列一下TxInfo的信息(TxInfo中主要列了TxStatus的一些关键字段以及oldTransactionInfo字段)
最常见的REQUIRED调用REQUIRED
REQUIRED调用REQUIRES_NEW
REQUIRED调用NOT_SUPPORTED
3.源码实现
直接进入源码实现部分,最好需要对Spring AOP,拦截器,代理等有个基本认知阅读起来会比较轻松。
3.1 事务的入口–TransactionInterceptor
TransactionInterceptor是Spring实现声明式事务的拦截器,它实现了AOP联盟的MethodInterceptor接口,它的父类TransactionAspectSupport封装了一些用于实现事务切面对事务进行管理的基本代码。下面来看一下TransactionInterceptor的继承关系。 
3.1.1 TransactionInterceptor#invoke
TransactionInterceptor对于MethodInterceptor#invoke的实现很简单,就是调用父类的的invokeWithinTransaction,并传递给此方法一个回调用于继续后续的拦截调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 因为这里的invocation.getThis可能是一个代理类,需要获取目标原生class。
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 调用父类TransactionAspectSupport的invokeWithinTransaction方法,第三个参数是一个简易回调实现,用于继续方法调用链。
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
3.1.2 TransactionAspectSupport#invokeWithinTransaction
这里就是上面TransactionInterceptor调用的invokeWithinTransaction实现,可以将之看作是一个大的环绕切面,将事务的创建与提交/回滚包在事务方法的外围。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 获取TransactionAttribute、PlatformTransactionManager、以及连接点方法信息。
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 根据上面抓取出来的txAttribute, tm, 连接点方法等信息判断是否需要开启事务。
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 执行回调,如果没有后续拦截器的话,就进入事务方法了。
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 事务发生异常。
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 把上一层事务的TxInfo重新绑到ThreadLocal中。
cleanupTransactionInfo(txInfo);
}
// 事务未发生异常。
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
3.1.3 TransactionAspectSupport#createTransactionIfNecessary
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果事务属性中name为null,则创建一个简易委托类,name为连接点方法标识。
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 根据事务属性判断是否需要开启事务,并返回状态。
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// 事务方法。
if (txAttr != null) {
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// 非事务方法。
if (logger.isTraceEnabled())
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}
// 无论是否创建了新事务,这里都会把当前的txInfo对象通过threadLocal变量绑定到当前线程。
txInfo.bindToThread();
return txInfo;
}
3.2 事务的管理–AbstractPlatformTransactionManager
AbstractPlatformTransactionManager是各种事务管理器的抽象基类,也可以说是骨架。它封装了很多事务管理的流程代码,子类需要实现一些模板方法。下面列出一些主要的模板方法。
- doGetTransaction 用于从TxMgr中拿一个事务对象,事务对象具体什么类型AbstractPlatformTransactionManager并不care。如果当前已经有事务的话,返回的对象应该是要包含当前事务信息的。
- isExistingTransaction 用于判断一个事务对象是否对应于一个已经存在的事务。Spring会根据这个方法的返回值来分类讨论事务该如何传播。
- doBegin 物理开启事务。
- doSuspend 将当前事务资源挂起。对于我们常用的DataSourceTransactionManager,它的资源就是ConnectionHolder。会将ConnectionHolder与当前线程脱钩并取出来。
- doResume 恢复当前事务资源。对于我们常用的DataSourceTransactionManager,它会将ConnectionHolder重新绑定到线程上。
- doCommit 物理提交事务。
- doRollback 物理回滚事务。
- doSetRollbackOnly 给事务标记为回滚。对于我们常用的DataSourceTransactionManager,它的实现是拿出事务对象中的ConnectionHolder打上回滚标记。这个标记是一种“全局的标记”,因为隶属于同一个物理事务都能读到同一个ConnectionHolder。
我们首先从上面createTransactionIfNecessary方法中调用到的getTransaction方法开始看起。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 根据具体的tm实现获取一个transaction对象。
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
// 已经存在事务的情况。
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// timeout不能小于-1。
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 如果传播行为是MANDATORY,则应该抛出异常(因为此时不存在事务)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 传播行为是REQUIRED, REQUIRES_NEW, NESTED。
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 注意这里的newTransaction标识位是true。
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 调用供子类实现的模板方法doBegin来开启事务。
doBegin(transaction, definition);
// 调用TransactionSynchronizationManager来保存一些事务上下文信息。
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
/*
* 这里的else分支说明传播行为是SUPPORTS或NOT_SUPPORTED或NEVER,这几种情况对于当前无事务的逻辑都是直接继续运行。
*/
else {
// 如果有指定事务隔离级别,则可以打warn日志报出指定隔离级别开启但没有事务的警告。
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 注意这里的transaction传的是null,这在尝试commit的时候会判断出其实没有实际需要提交的事务。
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
/**
* prepareSynchronization方法根据status是否需要维护新的事务相关资源,信息与回调。
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
下面我们来看一下对于当前已经有事务的情况下,Spring是如何处理的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
// 传播行为为NEVER,抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 传播行为为NOT_SUPPORTED,则挂起当前事务。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 传播行为为REQUIRES_NEW,则挂起当前事务并开启新事务运行。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启事务,由具体子类TxMgr实现。
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
// 传播行为为NESTED。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 如果不支持嵌套事务抛出异常。
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 是否对嵌套事务创建还原点。
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 这里最终会委托SavePointManager去创建还原点。
status.createAndHoldSavepoint();
return status;
}
else {
// 通常tm如果是JTA的话会走到这里来,这种情况就通过嵌套的begin和commit/rollback实现。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// 此时说明传播行为为SUPPORTS或REQUIRED或MANDATORY,则直接加入当前事务即可。
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
/*
* validateExistingTransaction是用来对SUPPORTS和REQUIRED的传播行为进行事务定义校验的开关。
* 默认是不开启的,此时在加入事务的时候内层注解的一些设定相当于会被忽略。
*/
if (isValidateExistingTransaction()) {
// 如果自定义了隔离级别校验与已有事务是否匹配。
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
// 如果已有事务隔离级别为null(说明是默认级别)或者已有事务隔离级别不等于当前事务定义的隔离级别抛出异常。
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// 如果已有事务为只读,但本方法事务定义为非只读,则抛出异常。
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
3.3 事务的挂起与恢复–suspend & resume方法
3.3.1 事务的挂起–suspend方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 本方法做的事情主要就是抓取当前事务资源,事务基本信息以及事务回调transaction synchronization等塞到SuspendedResourcesHolder中返回。
*/
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
/*
* 在TransactionSynchronizationManager的JavaDoc上已经写明了在需要进行transaction synchronization注册的时候需要先检查当前线程是否激活
* isSynchronizationActive是去读TransactionSynchronizationManager中当前线程绑定的TransactionSynchronization集合是否为null。
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 对所有本线程当前注册的transaction synchronization调用suspend方法。
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂起当前事务,由具体的TxMgr子类实现。
suspendedResources = doSuspend(transaction);
}
/*
* 将当前线程绑定的事务名,是否只读,隔离级别,是否有实际事务等信息抓取出来。
* 与刚才抓取出来的transaction synchronization集合一起包到SuspendedResourcesHolder中返回。
*/
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// 恢复transaction synchronization。
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
// 恢复transaction synchronization。
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}
else if (transaction != null) {
// 挂起当前事务。
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// 啥都没有。
return null;
}
}
private List<TransactionSynchronization> doSuspendSynchronization() {
// getSynchronizations返回的是一个不可变的快照。
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
// 逐一挂起。
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.suspend();
}
// 清理之后除非重新init否则无法再register新的transaction synchronization了。
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}
3.3.2 事务的恢复-resume方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder) throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 恢复挂起的事务资源,由具体的TxMgr子类实现。
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
// 还原挂起的事务的一些信息。
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
// 恢复挂起的transaction syncrhonization。
doResumeSynchronization(suspendedSynchronizations);
}
}
}
/**
* 重新初始化当前线程维护的synchronization集合,逐一对这些synchronization调用resume方法并加入到集合中。
*/
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.resume();
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
3.4 事务的提交与回滚–commit&rollback
commit与rollback两个方法是PlatformTransactionManager接口两个关键方法。 一般在事务切面增强的方法成功情况下会调用commit方法。在事务发生异常后,completeTransactionAfterThrowing方法会根据异常与事务规则是否匹配来决定是否需要回滚。如果需要回滚则调用rollback方法。
需要注意的是commit/rollback方法只是尝试,Spring会根据事务状态信息来具体处理,不代表一定会物理提交/回滚,Spring会在事务最外层边界才可能触发物理提交/回滚,甚至也有可能调用commit后发现需要rollback。
3.4.1 事务的提交–commit方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
public final void commit(TransactionStatus status) throws TransactionException {
// 如果事务已经完成了,则抛出异常。
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
/*
* 通过前文的源码分析可以知道无论事务怎么传播,每一次进入事务拦截器,在进入业务方法之前都会把一个TransactionInfo对象塞到
* transactionInfoHolder这个线程本地变量中。而TransactionInfo包含了一个TransactionStatus对象。
* commit方法是在业务方法正常完成后调用的,所谓isLocalRollbackOnly就是读当前TransactionStatus对象中的rollbackOnly标志位。
* 正如其名,它是一个局部的标志位,只有创建该status的那一层在业务方法执行完毕后会读到本层status的这个局部标志位。
*
* 我们可以在用户代码(业务方法或者切面)中通过TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
* 来置当前事务层的status对象的rollbackOnly标志位为true以手动控制回滚。
*/
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus);
return;
}
/*
* shouldCommitOnGlobalRollbackOnly默认实现是false。
* 这里判断的语义就是如果发现事务被标记全局回滚并且在全局回滚标记情况下不应该提交事务的话,则进行回滚。
*
* 我们通常用的DataSourceTransactionManager对于isGlobalRollbackOnly的判断是去读status中transaction对象的ConnectionHolder的rollbackOnly标志位。
*/
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus);
/*
* isNewTransaction用来判断是否是最外层(事务边界)。
* 举个例子:传播行为REQUIRE方法调REQUIRE方法再调REQUIRE方法,第三个方法抛出异常,第二个方法捕获,第一个方法走到这里会发现到了最外层事务边界。
* 而failEarlyOnGlobalRollbackOnly是一个标志位,如果开启了则会尽早抛出异常。
*
* 默认情况下failEarlyOnGlobalRollbackOnly开关是关闭的。这样如果内层事务发生了异常,退栈到外层事务后,代码走到这里回滚完后会抛出UnexpectedRollbackException。
*/
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
return;
}
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
// 钩子函数,TxMgr子类可以覆盖默认的空实现。
prepareForCommit(status);
// 回调transaction synchronization的beforeCommit方法。
triggerBeforeCommit(status);
// 回调transaction synchronization的beforeCompletion方法。
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
// 最外层事务边界。
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// 由具体TxMgr子类实现。
doCommit(status);
}
/*
* 我们一般用的DataSourceTransactionManager是不会走到这里的。
* 因为默认shouldCommitOnGlobalRollbackOnly开关是关闭的,检测到golobalRollbackOnly是不会走到processCommit方法的。
*
* 但shouldCommitOnGlobalRollbackOnly这个开关对于JtaTransactionManager来说是默认开启的,这里主要是需要针对检测到globalRollbackOnly但是doCommit没有抛出异常的情况。
*/
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 回调transaction synchronization的afterCompletion方法。
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// doCommit发生异常后,根据rollbackOnCommitFailure开关决定是否回滚,此开关默认关闭。
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
// 回调transaction synchronization的afterCompletion方法。
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}
try {
// 回调transaction synchronization的afterCommit方法。
triggerAfterCommit(status);
}
finally {
// 回调transaction synchronization的afterCompletion方法。
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 后续工作:status标记completed,在最外层清空transaction synchronization集合,恢复挂起事务资源等等。
cleanupAfterCompletion(status);
}
}
3.4.2 事务的回滚–rollback方法
AbstractPlatformTransactionManager#rollback方法,否则调用commit方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus);
}
private void processRollback(DefaultTransactionStatus status) {
try {
try {
// 回调transaction synchronization对象的beforeCompletion方法。
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
// 在最外层事务边界进行回滚。
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 由具体TxMgr子类实现回滚。
doRollback(status);
}
else if (status.hasTransaction()) {
/*
* 内层事务被标记为rollBackOnly或者globalRollbackOnParticipationFailure开关开启时,给当前事务标记需要回滚。
*
* 如果内层事务显式打上了rollBackOnly的标记,最终全事务一定是回滚掉的。
*
* 但如果没有被打上rollBackOnly标记,则globalRollbackOnParticipationFailure开关就很重要了。
* globalRollbackOnParticipationFailure开关默认是开启的,也就是说内层事务挂了,最终的结果只能是全事务回滚。
* 但如果globalRollbackOnParticipationFailure开关被关闭的话,内层事务挂了,外层事务业务方法中可以根据情况控制是否回滚。
*/
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 由具体TxMgr子类实现回滚。
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
// 回调transaction synchronization对象的afterCompletion方法。
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
// 后续工作:status标记completed,在最外层清空transaction synchronization集合,恢复挂起事务资源等等。
cleanupAfterCompletion(status);
}
}
4. 案例分析
4.1 进不了事务
这个问题也是使用Spring声明式事务很常见的问题。 首先保证配置都是正确的,并且开启了Spring事务(比如@EnableTransactionManagement)。
要明白进事务的本质就是进到事务切面的代理方法中,最常见的是同一个类的非事务方法调用一个加了事务注解的方法没进入事务。 我们以cglib代理为例,由于Spring的对于cglib AOP代理的实现,进入被代理方法的时候实际上已经离开了“代理这一层壳子”,可以认为代码走到的是一个朴素的bean,调用同一个bean中方法自然与代理没有半毛钱关系了。 一般对于声明式事务都是以调用另一个类的加了@Transactional注解的public方法作为入口的。
4.2 异常捕捉后没返回值并且抛出UnexpectedRollbackException
这个案例是工作中同事让我看的一个线上真实案例。 下面是我将原问题用简化代码的形式模拟业务场景的简单demo,只有core Java代码和Spring相关注解的可运行代码。
首先创建一个表。 create table t (value varchar(20));
新建三个类, ServiceA, ServiceB和Response。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 模拟加了事务注解的外层service。
*/
@Service
public class ServiceA {
@Autowired
private DataSource dataSource;
@Autowired
private ServiceB serviceB;
@Transactional
public Response insert() {
executeSql("insert into t select 'serviceA 开始'");
try {
serviceB.insert();
} catch (Exception e) {
System.out.println("serviceB#insert挂了,原因: " + e);
return Response.FAIL;
}
return Response.SUCC;
}
private void executeSql(String sql) {
Connection connection = DataSourceUtils.getConnection(dataSource);
try {
connection.createStatement().execute(sql);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 被外层ServiceA调用的ServiceB,抛出异常模拟来模拟挂了的情况。
*/
@Service
public class ServiceB {
@Autowired
private DataSource dataSource;
// 用于控制是否模拟insert方法挂了的情况。
private boolean flag = true;
@Transactional
public void insert() {
executeSql("insert into t select '这里是ServiceB挂之前'");
if (true) {
throw new RuntimeException("模拟内层事务某条语句挂了的情况");
}
executeSql("insert into t select '这里是ServiceB挂之后'");
}
private void executeSql(String sql) {
Connection connection = DataSourceUtils.getConnection(dataSource);
try {
connection.createStatement().execute(sql);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
1
2
3
4
5
6
7
/**
* Response类。
*/
public class Response {
public static final Response SUCC = new Response();
public static final Response FAIL = new Response();
}
如果调用ServiceA#insert可以观察到控制台输出 serviceB#insert挂了,原因: java.lang.RuntimeException: 模拟内层事务挂了的情况的报错日志。 但ServiceA#insert没有返回Response.FAIL并且Spring还抛出了异常 org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
观察MySQL通用日志,结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
2017-10-03T12:46:21.791139Z 382 Connect root@localhost on test using SSL/TLS
2017-10-03T12:46:21.796530Z 382 Query /* mysql-connector-java-5.1.42 ( Revision: 1f61b0b0270d9844b006572ba4e77f19c0f230d4 ) */SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout
2017-10-03T12:46:21.821206Z 382 Query SET character_set_results = NULL
2017-10-03T12:46:21.821757Z 382 Query SET autocommit=1
2017-10-03T12:46:21.826013Z 382 Query SET autocommit=0
2017-10-03T12:46:21.837792Z 382 Query select @@session.tx_read_only
2017-10-03T12:46:21.840326Z 382 Query insert into t select 'serviceA 开始'
2017-10-03T12:46:21.850478Z 382 Query select @@session.tx_read_only
2017-10-03T12:46:21.853736Z 382 Query insert into t select '这里是ServiceB挂之前'
2017-10-03T12:46:21.854520Z 382 Query rollback
2017-10-03T12:46:21.855058Z 382 Query SET autocommit=1
2017-10-03T12:46:21.855514Z 382 Query select @@session.tx_read_only
2017-10-03T12:46:21.856284Z 382 Quit
可以很清楚的看到整个事务最终被回滚掉了, ServiceB#insert并没有执行insert into t select '这里是ServiceB挂之后'。 其实对于Spring事务来说,这样的结果是正确的,但对于开发者来说,这个结果确实看似有些“不能理解”。
我们不妨来分析一下原因: 首先ServiceB#insert本身是直接抛出RuntimeException的,那么退栈到事务切面后,事务切面会发现需要回滚但因为ServiceB#insert还不是事务的最外层边界,所以在AbstractPlatformTransactionManager#processRollback方法仅仅会调用doSetRollbackOnly(status);,子类DataSourceTransactionManager会拿出TxStatus中的transaction对象打上回滚标记,具体来说就是transaction对象(对于DataSourceTransactionManager来说类型是DataSourceTransactionObject)会取出ConnectionHolder,调用setRollbackOnly。我们知道这样就相当于标记是一个全局的标记了,因为只要是隶属于同一个物理事务的Spring事务都能够读到同一个ConnectionHolder。
好了,接下来到了ServiceA在catch块准备返回Response.FAIL的时候,退栈到事务切面,在AbstractPlatformTransactionManager#commit方法读到if(!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly())条件成立,接下来调用processRollback,由于在事务最外层边界会物理回滚掉,并且也正是到了事务最外层边界,Spring抛出UnexpectedRollbackException。
至此原因已经分析完毕。
那么问题怎么解决呢,这个问题有好几种解决办法,但是得根据具体情况决定。 第一种: 根据实际代码与业务情况看看ServiceB#insert是否有必要加事务。如果不加事务的话,其实就事务角度来分析,ServiceB#insert相当于被内联到了ServiceA#insert中。就上面的示例而言,如果我们把ServiceB#insert的事务注解拿掉,则事务是可以顺利提交的,Spring也不会抛出UnexpectedRollbackException。但是ServiceB#insert实际上并没有完整执行,所以这样的解决思路很容易导致出现不完整的脏数据。当然还是要看具体业务需求,如果可以接受的话也无所谓。
第二种:手动控制是否回滚。如果不能接受ServiceB挂的话,可以在catch块里加上TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();用于显式控制回滚。这样Spring就明白你自己要求回滚事务,而不是unexpected了。Spring也不会抛出UnexpectedRollbackException了。那么如果在ServiceA中捕获到异常,真的就是不想回滚,即便ServiceA发生了异常,也想要最终提交整个事务呢?如果有这样的需求的话,可以给TxMgr配置一个参数setGlobalRollbackOnParticipationFailure(false);。这样只要没有显式在代码里通过给TxStatus设置回滚标记,Spring在内层事务挂了的情况,不会去给该事务打上需要回滚的标记。换句话说,这时候处于完全手动挡来控制内层事务挂了的情况到底整个事务的提交/回滚了。
第三种:继续向上抛出异常。可以考虑继续向上抛出异常,一般在web项目都会配置一个大的异常处理切面,统一返回失败Response。Service层不需要考虑Response。因此可以考虑重构掉ServiceA的方法签名,改为void,不关心Response。也不用去捕捉ServiceB的异常了。
5. 总结
本文主要以Spring声明式事务为切入点,介绍了Spring事务的实现原理与源码。由于在前文的套路简介中也以文字描述了Spring声明式事务的大致套路,这里不再赘述。这里顺便提一句,阅读Spring源码除了看注释,调试代码,其实很个东西很容易忽视——Spring打印log的语句,那些语句的内容很多时候都是很有启发的,会让你突然明白整个分支逻辑到底是想干什么。
这里再整理一下整个事务切面的流程:
- 调用被事务增强的方法,进入事务切面。
- 解析事务属性,调用具体的TxMgr负责生成TxStatus。
- 如果当前已经有事务,进入step 5。
- 根据事务传播行为,校验是否需要抛出异常(如MANDATORY),或者挂起事务信息(由于没有真正的物理事务,所以没有需要挂起的事务资源)并创建事务(REQUIRED, REQUIREDS_NEW, NESTED等),又或者创建一个空事务(SUPPORTS, NOT_SUPPORTED, NEVER等)。进入step 6。
- 根据事务传播行为,抛出异常(NEVER),或者挂起事务资源与信息并根据情况决定是否创建事务(NOT_SUPPORTED, REQUIRES_NEW等),或者根据情况处理嵌套事务(NESTED)或者加入已有事务(SUPPORTS, REQUIRED, MANDATORY等)。
- 生成TxInfo并绑定到线程。
- 回调MethodInterceptor,事务切面前置工作至此完成。
- 如果发生异常进入step 10。
- 根据TxStatus是否被标记回滚,事务本身是否被标记回滚等决定是否要进入处理回滚的逻辑。只有在某事务最外层边界,才可能进行物理提交/回滚,否则最多就在需要回滚的情况下给事务打标需要回滚,不会有真正的动作。并且一般情况下,如果在事务最外边界发现事务需要回滚,会抛出UnexpectedRollbackException。其余情况进入step 11。
- 根据异常情况与事务属性判断异常是否需要进入处理回滚的逻辑还是进入处理提交的逻辑。如果需要回滚则根据是否到了某事务最外层边界决定是否进行物理回滚,否则给事务打标需要回滚。如果进入处理提交逻辑则同样只有在事务最外层边界才可能有真正的物理提交动作。
- 无论是否发生异常,都会恢复TxInfo为前一个事务的TxInfo(类似于弹栈)。
下面总结一下Spring事务控制的一些重要参数。掌握这些参数可以更灵活地配置TxMgr。
几个重要参数
Spring事务在控制提交和回滚中用了不少判断条件,了解其中一些关键参数的含义对debug问题很有帮助。下文描述的一些控制参数的默认是指在AbstractPlatformTransactionManager中的默认值。
- transactionSynchronization 在Spring事务的源码实现中,将synchronization与transaction的边界划分分离开来。可以在DefaultTransactionStatus类中看到newTransaction和newSynchronization是两个独立的边界控制参数。它有三种取值:
- SYNCHRONIZATION_ALWAYS(0) 这是默认取值。在默认取值的情况下,在任何情况下都会开启资源与信息的同步维护。举个例子:这样的话即便我们进入了NOT_SUPPORTED传播行为的方法,通过
TransactionSynchronizationManager.getCurrentTransactionName()也能拿到当前的事务名(为此NOT_SUPPORTED事务方法名)。同理,在没有事务的情况下进入SUPPORTS传播行为的方法也能够读到当前事务名currentTransactionName。 - SYNCHRONIZATION_ON_ACTUAL_TRANSACTION(1) 此取值保证只维护存在实际物理事务的事务方法的信息与资源。举个例子:在没有事务情况下进入SUPPORTS传播行为的方法,通过TransactionSynchronizationManager拿到的currentTransactionName一定是null。但如果取值为SYNCHRONIZATION_ALWAYS就会不同,如果不存在实际物理事务则拿到的事务名是SUPPORTS方法名,如果存在实际物理事务则能够拿到创建该实际物理事务的方法名。
- SYNCHRONIZATION_NEVER(2) 这个取值会导致在任何情况下TxStats的isNewSynchronization方法永远返回false。这样的话像currentTransactionName, currentTransactionIsolationLevel以及transaction synchronization都是没办法与事务生命周期同步维护的。
- SYNCHRONIZATION_ALWAYS(0) 这是默认取值。在默认取值的情况下,在任何情况下都会开启资源与信息的同步维护。举个例子:这样的话即便我们进入了NOT_SUPPORTED传播行为的方法,通过
- nestedTransactionAllowed 默认为false。这个和具体TxMgr子类实现有关。这个开关的作用是是否允许嵌套事务,如果不允许的话如果有NESTED传播行为事务试图加入已有事务会抛出NestedTransactionNotSupportedException。
- validateExistingTransaction 默认为false。也就是在内层事务(比如REQUIRED/SUPPORTS传播行为)加入外层事务的时候不会做校验。如果开启的话会去校验内外层事务隔离级别,是否只读等。如果有冲突会抛出IllegalTransactionStateException。
- globalRollbackOnParticipationFailure 默认为true。在内层事务(比如REQUIRED/SUPPORTS传播行为)加入外层事务后,如果内层事务挂了,会给事务打上回滚的全局标记,这个事务最终只能被回滚而不能被提交。 如果把开关置为false的话,我们在应用层捕捉到内层事务的异常后,可以自己决定到底要不要回滚(要回滚的话具体实现可以继续抛异常或者给事务状态setRollbackOnly手动标记回滚)。
- failEarlyOnGlobalRollbackOnly 默认为false。也就是在最外层事务边界尝试提交时发现事务需要回滚,在物理回滚完后,会抛出UnexpectedRollbackException,而内层事务发现事务需要回滚时,仅仅只会调用processRollback给事务标记回滚。 如果这个开关打开的话,在某个内层事务尝试提交时发现事务需要回滚后,在调用processRollback给事务打上回滚的全局标记后,就会直接立刻抛出UnexpectedRollbackException。外层事务代码只要没捕获到异常,后续代码就不会继续执行,退栈到事务切面中进行物理回滚。 这个开关打开的一个好处是可以避免内层事务挂了,外层捕获异常,但我们期望整个事务最终还是需要回滚的情况下,避免外层事务继续执行语句,否则可能会执行了一大堆的sql最终还是回滚。
- rollbackOnCommitFailure 默认为false。如果开启的话在事务真正物理提交doCommit失败后会进行回滚。
- shouldCommitOnGlobalRollbackOnly() 与其他参数不同, shouldCommitOnGlobalRollbackOnly是作为一个boolean方法可以被TxMgr具体实现类覆盖的。默认为false。它用于控制一个事务对象被打上需要回滚的全局标记后,是否需要回滚。如果置为true的话,会去尝试提交事务。需要注意的是即便如此,如果TxMgr子类的doCommit方法实现中没有一些对这种被打上回滚标记的事务的处理逻辑,例如抛出异常之类的,Spring最终还是会抛出UnexpectedRollbackException。我们常见使用的DataSourceTransactionManager一般不会去把这个参数改为true。
6. 一个小插曲
在阅读源码的过程中,避免不了阅读Java Doc注释,发现有一处写了个”PROPAGATION_REQUIRES”,按照TransactionDefinition中定义的常量应该是”PROPAGATION_REQUIRED”以及”PROPAGATION_REQUIRES_NEW”,前者是”REQUIRED”而不是”REQUIRES”。我用正则在Spring全源码中搜了一下把”PROPAGATION_REQUIRED”写成”PROPAGATION_REQUIRES”的有三处,然后给Spring发了一个PR。已经合入Spring主干。
7. 参考
Spring 4.3.5 源码 StackOverflow
