0x01:事务未释放导致锁表


回顾:


原代码结构

public void methond01 (List list1) {
  for (Object o : list1) {
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    try {
      ...
      if (...) {
        sendMail();
        continue;
      }
      ...
      delete();  
      insert();
      dataSourceTransactionManager.commit(transactionStatus);
    }catch (Exception e) {
      dataSourceTransactionManager.rollback(transactionStatus);
    }
  }
}


报错:

具体报错已无日志可查了(糟糕执行力的具体表现:问题已过了2周了),大意便是delete方法提示获取锁超时。

处理:

因为提示的是获取锁超时,即表明在delete方法执行时,目标表上存在事务,且未释放。

查看当前事务及对应的链接信息:

select 
	* 
FROM 
	information_schema.INNODB_TRX a 
	inner join information_schema.`PROCESSLIST` b on a.trx_mysql_thread_id = b.ID 
;


发现只有一个十几分钟前开启的事务,对应链接的info信息为空,且该事务的开启时间恰好等于第一次执行该方法的时间。


补充:

  1. 该方法是在同步定时任务中,第一次执行完发现数据库只同步了3条数据,并发了3封邮件。(xxjob的调度等了很久才返回成功?)
  2. 过了十几分钟,再次执行,便出现上述报错。

判断:

因为没有更多信息判断该事务记录涉及的表,但开始时间是完全一致的,故假定该事务没有关闭,一直锁表,后续再跑时,无法删除数据。

问题的原因便是上述的continue,开启了事务,走了continue,没有commit/rollback,修改就很简单了,continue移动到事务开启之前即可。

修改后再同步,最终同步了100多条数据,3条走了continue,发了邮件。

What`s more?

Question:

那个未被释放的事务是如何产生的呢?

1.事务是针对链接的,mysql的innodb引擎支持事务,项目数据库的隔离级别为 READ COMMITTED;

mysql层面不支持嵌套事务:当执行第二个begin时,第一个事务已经被提交了。

begin;
select * from maindata_win_sellout_initial a where a.id in (442147,442148) for update ;
# 此时存在事务
begin;
#此时之前的事务已提交了
select * from maindata_win_sellout_initial a where a.id = 442149 for update ;
#此时存在新的定时任务
rollback;
#此时已不存在定时任务
commit;


Tips1: begin后要执行具体的语句才会开启事务。

Tips2: mysql有 savepoint和rollback to。

2.此时spring的隔离级别为默认,事务传播级别为默认(REQUIRED);

提出假设:


前面三条数据,正常开启事务并提交了,所以那三条可见。后面三条continue,开启事务,陆续走了continue,因为默认传播等级,故事务都是使用的第4条数据开启的事务(a),后续7~n条正常的数据也是使用的事务a,后续的commit对事务a无效,故相当于我们开启了事务a执行了大量的操作,但没有提交。
该假设满足:3条数据可见,3封邮件,后续数据不可见。再次开启时,因为事务a未释放(没有索引锁了整个表),导致无法后续获取锁超时。

验证:

后续的commit没有对之前的事务生效?

public void testTransaction() {
        for (int i = 2; i < 10; i++) {
            //手动开启事务
            TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

            try {
                if (i == 5) {
                    //自5之后都是同一个连接
                    continue;
                }

                testtranscationMapper.insert(new Testtranscation("code"+i, "value"+i));

                dataSourceTransactionManager.commit(transactionStatus);
            } catch (Exception e) {
                dataSourceTransactionManager.rollback(transactionStatus);
            }
        }
    }


1.创建事务时,会依据传播类型+是否已存在连接,来决定是新开事务还是用以前的。

2.如果是用以前的,commit方法不会提交。
image-20240617170803092
这里的核心要点便是commit方法,如果判断该事务不是新事务,不会执行下面的doCommit 后续所有的操作都不会commit。自然外界的查询也不会查到,只有手动关闭连接/服务断开,才会释放锁。


后续疑问:

  • spring的事务7种传播的实现原理?(mysql是没有这些的,尤其是嵌套及创建新的,是基于savepoint实现的吗?– 是的)基于最小活动原则,后面碰到了再细究类型,反正利用了mysql提供的savepoint逻辑。
  • spring是基于jdbc的封装吗? 是的
  • spring的整个数据库连接管理的框架及实现原理?
  • 应用如何维持在执行完后不会关闭?
  • 探究源码的过程中,TransactionSynchronization,事务拓展,譬如事务提交后进行操作,类似bean的前后处理!哇,背后相同的思想。

对于上面的问题,首先看下事务这一块儿的源码:

获取事务:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// Use defaults if no transaction definition given.
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
		//note1:获取事务
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
      //note2:如果是已经存在的事务走这里
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}

		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
			}
			try {
        //note3:默认传播类型,走这里
				return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}


Note1:核心方法是获取连接

private static Object doGetResource(Object actualKey) {
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
  	//获取线程变量里面的连接holder,第一次,前面的null就返回了
  	//null 会在后续dobegin时建立新的连接,同时绑定资源(写到resources里)
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}


Note3:开启事务,核心方法是doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
        //没有连接会在此处新建一个连接
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);
			txObject.setReadOnly(definition.isReadOnly());

			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
			// so we don't want to do it unnecessarily (for example if we've explicitly
			// configured the connection pool to set it already).
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				con.setAutoCommit(false);
			}

			prepareTransactionalConnection(con, definition);
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the connection holder to the thread.
			if (txObject.isNewConnectionHolder()) {
        //将新的连接绑定到线程变量resource里
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}


Note2 :如果是加入的以前的连接,会依据不同的事务传播类型,进行不同的操作。不再细数。

总之,会依据传播类型,决定是新建一个事务,还是加入以前的事务,而事务是针对连接的。

事务提交:


事务提交主要是一堆的监听器,核心便是isNewTransaction 才会提交

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			boolean commitListenerInvoked = false;

			try {
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;

				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
					commitListenerInvoked = true;
					status.releaseHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
					commitListenerInvoked = true;
          //只有判断为新事务,才会提交!!!
					doCommit(status);
				}
				else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}

				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (unexpectedRollback) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
				throw ex;
			}
			catch (TransactionException ex) {
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
					if (commitListenerInvoked) {
						this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, ex));
					}
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
				if (commitListenerInvoked) {
					this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, null));
				}
			}

		}
		finally {
			cleanupAfterCompletion(status);
		}
	}


事务回滚:


相似逻辑,只不过commit只有那一个地方,rollback这可能会在多个地方调用,进行回滚。

还有一些拓展没解决,不过那些拓展可能更适合新开一个主题,对于这一个问题,相对较为清晰了,但目前我们只要了解的是基于手动开启事务的部分,对于模板与注解的逻辑尚未涉及。