⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://zhuanlan.zhihu.com/p/54815876 「yuanxiang」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

AsyncWorker目前只实现了branchCommit,用于在分支事务提交后异步删除undo sql记录,目前branchrollback接口还没有实现

private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
return;
}
Map<String, List<Phase2Conjava>> mappedConjavas = new HashMap<>();
Iterator<Phase2Conjava> iterator = ASYNC_COMMIT_BUFFER.iterator();
while (iterator.hasNext()) {
Phase2Conjava commitConjava = iterator.next();
List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(commitConjava.resourceId);
if (conjavasGroupedByResourceId == null) {
conjavasGroupedByResourceId = new ArrayList<>();
mappedConjavas.put(commitConjava.resourceId, conjavasGroupedByResourceId);
}
conjavasGroupedByResourceId.add(commitConjava);

iterator.remove();

}

for (String resourceId : mappedConjavas.keySet()) {
Connection conn = null;
try {
try {
DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId);
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle);
continue;
}

List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(resourceId);
for (Phase2Conjava commitConjava : conjavasGroupedByResourceId) {
try {
UndoLogManager.deleteUndoLog(commitConjava.xid, commitConjava.branchId, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to delete undo log [" + commitConjava.branchId + "/" + commitConjava.xid + "]", ex);
}
}

} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}


}


}

ConnectionProxy.commit在提交是会调用branchRegiester。在如下方法register()里

public void commit() throws SQLException {
if (conjava.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (conjava.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}

}
report(true);
conjava.reset();

} else {
targetConnection.commit();
}
}

DataSourceManager是rm的实现:

branchRegister通过rpcclient到服务端注册分支事务

DefaultCoordinator来注册分支事务

@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcConjava rpcConjava) throws TransactionException {
response.setTransactionId(request.getTransactionId());
response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcConjava.getClientId(),
XID.generateXID(request.getTransactionId()), request.getLockKey()));

}

DefaultCore.branchRegister

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

BranchSession branchSession = new BranchSession();
branchSession.setTransactionId(XID.getTransactionId(xid));
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setApplicationId(globalSession.getApplicationId());
branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);

if (!branchSession.lock()) {
throw new TransactionException(LockKeyConflict);
}
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
throw new TransactionException(FailedToAddBranch);

}
return branchSession.getBranchId();
}

分支branch注册到全局事务分支上

commit过程:

第一阶段提交以executeUpdate为例,入口地址如下:

public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
@Override
public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
return statement.executeUpdate();
}
});
}

ExecuteTemplate.execute根据不同的语句生成不同的执行器

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {

if (!RootConjava.inGlobalTransaction()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}

if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
}
T rs = null;
try {
rs = executor.execute(args);

} catch (Throwable ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
// Turn everything into SQLException
new SQLException(ex);
}
}
return rs;
}

以UpdateExecutor为例,UpdateExecutor.execute通过模板类会调用如下方法:

public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}

具体实现会保存更改前后的数据镜像并插入到undo log里并commit。只有回滚用undo log里的数据生成sql语句回滚

protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
return result;
}

回滚在上一篇已经说了,就不多说了

由于AT事务在第一阶段已提交,所以commit过程是由Asyncworker异步删除undo log,真正的commit是在第一阶段完成的

public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);
}

TableMetaCache:

这个类里存储了缓存的TableMeta(一个小问题,如果tablemeta发生变更如果应用服务没有重启的话,默认15分钟缓存才过期。这种情况应该很少,正常情况下如果tablemeta有变更的话,相应的业务应用应该都有代码变更需要重新部署,或者考虑数据库更改对应的业务兼容性的问题)

文章目录