【spring】事务提交后进行某些操作

在工作中,我们经常会有这样的需求。当修改了某些东西后,需要通知其他服务模块或系统进行某些处理。
这个如果硬编码的话,是一种很不优雅的方法:
首先,它的灵活性很差,当需求变更时,无可避免的要修改业务操作的代码,容易出bug,不符合开关原则(虽然不是讨论设计模式)。
其次,主业务操作(数据修改)部分和后续的通知处理应该是隔离的,后续的通知是否成功,对业务操作都不应有影响,或者在功能和时间上尽可能不影响主业务操作。

那么如果不用硬编码,事件(消息)驱动的方式是常用的一种方法,这样我们就可以把后续通知的相关处理和主业务操作解耦开来,就算要进行拓展也是很安全和方便的。

spring 的 TransactionSynchronizationManager

在spring中,TransactionSynchronizationManager有一个事务提交后的回调支持,我们可以视线注册需要在事务commit后才进行的某些操作,如afterCommit()

// 这里是上下文
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void suspend() {
}

@Override
public void resume() {
}

@Override
public void flush() {
}

@Override
public void beforeCommit(boolean readOnly) {
}

@Override
public void beforeCompletion() {
}

@Override
public void afterCommit() {
log.info('afterCommit()');
}

@Override
public void afterCompletion(int status) {
}
});

因为TransactionSynchronization是interface,所以TransactionSynchronization的实现是一个匿名内部类,相当于一个闭包,可以访问上下文的变量等,方便我们进行某些操作。

运行细节 ( 源码 )

接下来看一下TransactionSynchronizationManager的实现,它的成员中,有个ThreadLocal副本,保存了不同线程已注册的同步操作:

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

在spring运行过程中,事务相关的操作会通过TransactionInterceptor执行invocation:

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}


protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
...
...
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
}

invokeWithinTransaction()是父类TransactionAspectSupport的方法,并会调用commitTransactionAfterReturning(TransactionInfo txInfo))来提交事务:

   // TransactionInterceptor.java

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
// AbstractPlatformTransactionManager.java

@Override
public final void commit(TransactionStatus status) throws TransactionException {
...
processCommit(defStatus);
}

processCommit()这个方法中很详细的介绍了提交事务的处理细节,包括beforeCommit() / beforeCompletion() / afterCommit() / afterCompletion()在事务提交时的调用和发生异常时的处理方法,还可以看到如isNewTransaction、globalRollbackOnly等对事务处理的影响。

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}

可以看到triggerAfterCommit()在doCommit(status),不管是否抛出异常都会执行afterCompletion(),这个细节在平时开发也要留意,做好异常的处理。

   private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering afterCommit synchronization");
}
TransactionSynchronizationUtils.triggerAfterCommit();
}
}

终于,前面我们注册在TransactionSynchronizationManager的TransactionSynchronization,在这里被get出来执行,如果注册了多个通知操作,会被按顺序执行,因为这个Set的实现是LinkedHashSet。

   // TransactionSynchronizationUtils
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}

public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}

至今为止,以上讨论的是弱一致性的业务场景,一致性要求比较高的,需要分布式事务或者其他的手段来实现。
而且需要注意的,afterCommit()中的操作是同步的,和业务操作在同一个响应时间内的,所以,尽量不要做一些耗时的操作。
afterCommit()中,除了直接发送消息到队列,也可以使用本地队列来优化,用来存储和异步发送消息,这样会快很多。