diff --git a/pom.xml b/pom.xml index 1ccf1deec..e028ca85e 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 1.8.3 2.7.15 3.13.2.Final - 1.4.2 + 1.5.1 1.1.8 4.2.9 diff --git a/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionHandler.java b/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionHandler.java index 08ff68728..0452515ca 100644 --- a/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionHandler.java +++ b/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionHandler.java @@ -91,6 +91,9 @@ public TransactionInfo getTransactionInfo() { throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getCause()); + throw e.getCause(); + case RollbackRetrying: + failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException()); throw e.getCause(); default: throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code); diff --git a/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionalInterceptor.java b/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionalInterceptor.java index 1147b07bd..847e151e5 100644 --- a/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionalInterceptor.java +++ b/src/main/java/io/jboot/support/seata/interceptor/SeataGlobalTransactionalInterceptor.java @@ -72,7 +72,7 @@ public Object execute() throws Throwable { @Override public GlobalLockConfig getGlobalLockConfig() { GlobalLockConfig config = new GlobalLockConfig(); - config.setLockRetryInternal(globalLockAnno.lockRetryInternal()); + config.setLockRetryInterval(globalLockAnno.lockRetryInternal()); config.setLockRetryTimes(globalLockAnno.lockRetryTimes()); return config; } diff --git a/src/main/java/io/jboot/support/seata/tcc/ActionInterceptorHandler.java b/src/main/java/io/jboot/support/seata/tcc/ActionInterceptorHandler.java index 8c16211e6..12cfe3c13 100644 --- a/src/main/java/io/jboot/support/seata/tcc/ActionInterceptorHandler.java +++ b/src/main/java/io/jboot/support/seata/tcc/ActionInterceptorHandler.java @@ -26,8 +26,12 @@ import io.seata.rm.DefaultResourceManager; import io.seata.rm.tcc.TCCResource; import io.seata.rm.tcc.api.BusinessActionContext; +import io.seata.rm.tcc.api.BusinessActionContextParameter; +import io.seata.rm.tcc.api.BusinessActionContextUtil; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; +import io.seata.rm.tcc.interceptor.ActionContextUtil; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Parameter; import java.util.HashMap; @@ -40,7 +44,6 @@ * @author zhangsen/菜农 commit: https://gitee.com/fuhai/jboot/commit/55564bfd9e6eebfc39263291d89592cd16f77498 */ public class ActionInterceptorHandler { - private static final Log LOGGER = Log.getLog(TccActionInterceptor.class); /** @@ -50,10 +53,9 @@ public class ActionInterceptorHandler { * @param arguments the arguments * @param businessAction the business action * @return map map - * @throws Throwable the throwable */ public void proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction, - Invocation invocation) { + Invocation invocation) { //TCC name String actionName = businessAction.name(); @@ -79,8 +81,30 @@ public void proceed(Method method, Object[] arguments, String xid, TwoPhaseBusin e.printStackTrace(); } actionContext.setBranchId(branchId); - - invocation.invoke(); + // save the previous action context + BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext(); + try { + //share actionContext implicitly + BusinessActionContextUtil.setContext(actionContext); + if (businessAction.useTCCFence()) { + // Use TCC Fence, and return the business result + TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName); + } + invocation.invoke(); + } finally { + try { + //to report business action context finally if the actionContext.getUpdated() is true + BusinessActionContextUtil.reportContext(actionContext); + } finally { + if (previousActionContext != null) { + // recovery the previous action context + BusinessActionContextUtil.setContext(previousActionContext); + } else { + // clear the action context + BusinessActionContextUtil.clear(); + } + } + } } /** @@ -153,6 +177,7 @@ protected void initBusinessContext(Map context, Method method, context.put(Constants.COMMIT_METHOD, businessAction.commitMethod()); context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod()); context.put(Constants.ACTION_NAME, businessAction.name()); + context.put(Constants.USE_TCC_FENCE, businessAction.useTCCFence()); } } @@ -193,10 +218,42 @@ public void registryResource(Method m, Object interfaceClass, Class[] argument tccResource.setRollbackMethod(ReflectionUtil .getMethod(interfaceClass.getClass(), businessAction.rollbackMethod(), new Class[] {BusinessActionContext.class})); + // set argsClasses + tccResource.setCommitArgsClasses(businessAction.commitArgsClasses()); + tccResource.setRollbackArgsClasses(businessAction.rollbackArgsClasses()); + // set phase two method's keys + tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(), + businessAction.commitArgsClasses())); + tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(), + businessAction.rollbackArgsClasses())); //registry tcc resource DefaultResourceManager.get().registerResource(tccResource); } } - + protected String[] getTwoPhaseArgs(Method method, Class[] argsClasses) { + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + String[] keys = new String[parameterAnnotations.length]; + /* + * get parameter's key + * if method's parameter list is like + * (BusinessActionContext, @BusinessActionContextParameter("a") A a, @BusinessActionContextParameter("b") B b) + * the keys will be [null, a, b] + */ + for (int i = 0; i < parameterAnnotations.length; i++) { + for (int j = 0; j < parameterAnnotations[i].length; j++) { + if (parameterAnnotations[i][j] instanceof BusinessActionContextParameter) { + BusinessActionContextParameter param = (BusinessActionContextParameter)parameterAnnotations[i][j]; + String key = ActionContextUtil.getParamNameFromAnnotation(param); + keys[i] = key; + break; + } + } + if (keys[i] == null && !(argsClasses[i].equals(BusinessActionContext.class))) { + throw new IllegalArgumentException("non-BusinessActionContext parameter should use annotation " + + "BusinessActionContextParameter"); + } + } + return keys; + } } diff --git a/src/main/java/io/jboot/support/seata/tcc/JbootTCCResourceManager.java b/src/main/java/io/jboot/support/seata/tcc/JbootTCCResourceManager.java new file mode 100644 index 000000000..c7bd105bb --- /dev/null +++ b/src/main/java/io/jboot/support/seata/tcc/JbootTCCResourceManager.java @@ -0,0 +1,242 @@ +package io.jboot.support.seata.tcc; + +import com.alibaba.fastjson.JSON; +import io.seata.common.Constants; +import io.seata.common.exception.ShouldNeverHappenException; +import io.seata.common.exception.SkipCallbackWrapperException; +import io.seata.common.util.StringUtils; +import io.seata.core.exception.TransactionException; +import io.seata.core.model.BranchStatus; +import io.seata.core.model.BranchType; +import io.seata.core.model.Resource; +import io.seata.rm.AbstractResourceManager; +import io.seata.rm.tcc.TCCResource; +import io.seata.rm.tcc.TwoPhaseResult; +import io.seata.rm.tcc.api.BusinessActionContext; + +import java.lang.reflect.Method; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author zhangxn + */ +public class JbootTCCResourceManager extends AbstractResourceManager { + + /** + * TCC resource cache + */ + private Map tccResourceCache = new ConcurrentHashMap<>(); + + /** + * Instantiates a new Tcc resource manager. + */ + public JbootTCCResourceManager() { + // not do anything + } + + /** + * registry TCC resource + * + * @param resource The resource to be managed. + */ + @Override + public void registerResource(Resource resource) { + TCCResource tccResource = (TCCResource)resource; + tccResourceCache.put(tccResource.getResourceId(), tccResource); + super.registerResource(tccResource); + } + + @Override + public Map getManagedResources() { + return tccResourceCache; + } + + /** + * TCC branch commit + * + * @param branchType + * @param xid Transaction id. + * @param branchId Branch id. + * @param resourceId Resource id. + * @param applicationData Application data bind with this branch. + * @return BranchStatus + * @throws TransactionException TransactionException + */ + @Override + public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, + String applicationData) throws TransactionException { + TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); + if (tccResource == null) { + throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); + } + Object targetTCCBean = tccResource.getTargetBean(); + Method commitMethod = tccResource.getCommitMethod(); + if (targetTCCBean == null || commitMethod == null) { + throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); + } + try { + //BusinessActionContext + BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, + applicationData); + Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext); + Object ret; + boolean result; + // add idempotent and anti hanging + if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) { + try { + result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args); + } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { + throw e.getCause(); + } + } else { + ret = commitMethod.invoke(targetTCCBean, null); + if (ret != null) { + if (ret instanceof TwoPhaseResult) { + result = ((TwoPhaseResult)ret).isSuccess(); + } else { + result = (boolean)ret; + } + } else { + result = true; + } + } + LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); + return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable; + } catch (Throwable t) { + String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); + LOGGER.error(msg, t); + return BranchStatus.PhaseTwo_CommitFailed_Retryable; + } + } + + /** + * TCC branch rollback + * + * @param branchType the branch type + * @param xid Transaction id. + * @param branchId Branch id. + * @param resourceId Resource id. + * @param applicationData Application data bind with this branch. + * @return BranchStatus + * @throws TransactionException TransactionException + */ + @Override + public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, + String applicationData) throws TransactionException { + TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); + if (tccResource == null) { + throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); + } + Object targetTCCBean = tccResource.getTargetBean(); + Method rollbackMethod = tccResource.getRollbackMethod(); + if (targetTCCBean == null || rollbackMethod == null) { + throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); + } + try { + //BusinessActionContext + BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, + applicationData); + Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); + + Object ret; + boolean result; + // add idempotent and anti hanging + if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) { + try { + result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId, + args, tccResource.getActionName()); + } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { + throw e.getCause(); + } + } else { + ret = rollbackMethod.invoke(targetTCCBean,args); + if (ret != null) { + if (ret instanceof TwoPhaseResult) { + result = ((TwoPhaseResult)ret).isSuccess(); + } else { + result = (boolean)ret; + } + } else { + result = true; + } + } + LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); + return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable; + } catch (Throwable t) { + String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); + LOGGER.error(msg, t); + return BranchStatus.PhaseTwo_RollbackFailed_Retryable; + } + } + + /** + * transfer tcc applicationData to BusinessActionContext + * + * @param xid the xid + * @param branchId the branch id + * @param resourceId the resource id + * @param applicationData the application data + * @return business action context + */ + protected BusinessActionContext getBusinessActionContext(String xid, long branchId, String resourceId, + String applicationData) { + Map actionContextMap = null; + if (StringUtils.isNotBlank(applicationData)) { + Map tccContext = JSON.parseObject(applicationData, Map.class); + actionContextMap = (Map)tccContext.get(Constants.TCC_ACTION_CONTEXT); + } + if (actionContextMap == null) { + actionContextMap = new HashMap<>(2); + } + + //instance the action context + BusinessActionContext businessActionContext = new BusinessActionContext( + xid, String.valueOf(branchId), actionContextMap); + businessActionContext.setActionName(resourceId); + return businessActionContext; + } + + /** + * get phase two commit method's args + * @param tccResource tccResource + * @param businessActionContext businessActionContext + * @return args + */ + private Object[] getTwoPhaseCommitArgs(TCCResource tccResource, BusinessActionContext businessActionContext) { + String[] keys = tccResource.getPhaseTwoCommitKeys(); + Class[] argsCommitClasses = tccResource.getCommitArgsClasses(); + return this.getTwoPhaseMethodParams(keys, argsCommitClasses, businessActionContext); + } + + /** + * get phase two rollback method's args + * @param tccResource tccResource + * @param businessActionContext businessActionContext + * @return args + */ + private Object[] getTwoPhaseRollbackArgs(TCCResource tccResource, BusinessActionContext businessActionContext) { + String[] keys = tccResource.getPhaseTwoRollbackKeys(); + Class[] argsRollbackClasses = tccResource.getRollbackArgsClasses(); + return this.getTwoPhaseMethodParams(keys, argsRollbackClasses, businessActionContext); + } + + private Object[] getTwoPhaseMethodParams(String[] keys, Class[] argsClasses, BusinessActionContext businessActionContext) { + Object[] args = new Object[argsClasses.length]; + for (int i = 0; i < argsClasses.length; i++) { + if (argsClasses[i].equals(BusinessActionContext.class)) { + args[i] = businessActionContext; + } else { + args[i] = businessActionContext.getActionContext(keys[i], argsClasses[i]); + } + } + return args; + } + + @Override + public BranchType getBranchType() { + return BranchType.TCC; + } +} diff --git a/src/main/java/io/jboot/support/seata/tcc/TCCFenceHandler.java b/src/main/java/io/jboot/support/seata/tcc/TCCFenceHandler.java new file mode 100644 index 000000000..2e4dad76a --- /dev/null +++ b/src/main/java/io/jboot/support/seata/tcc/TCCFenceHandler.java @@ -0,0 +1,325 @@ +package io.jboot.support.seata.tcc; + +/** + * @author zhangxn + * @date 2022/5/30 21:32 + */ + +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.DbPro; +import com.jfinal.plugin.activerecord.IAtom; +import io.jboot.db.datasource.DataSourceBuilder; +import io.jboot.db.datasource.DataSourceConfig; +import io.jboot.db.datasource.DataSourceConfigManager; +import io.jboot.utils.StrUtil; +import io.seata.common.exception.FrameworkErrorCode; +import io.seata.common.thread.NamedThreadFactory; +import io.seata.rm.tcc.TwoPhaseResult; +import io.seata.rm.tcc.constant.TCCFenceConstant; +import io.seata.rm.tcc.exception.TCCFenceException; +import io.seata.rm.tcc.store.TCCFenceDO; +import io.seata.rm.tcc.store.TCCFenceStore; +import io.seata.rm.tcc.store.db.TCCFenceStoreDataBaseDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * TCC Fence Handler(idempotent, non_rollback, suspend) + * + * @author kaka2code + */ +public class TCCFenceHandler { + + private TCCFenceHandler() { + throw new IllegalStateException("Utility class"); + } + + private static final Logger LOGGER = LoggerFactory.getLogger(io.seata.rm.tcc.TCCFenceHandler.class); + + private static final TCCFenceStore TCC_FENCE_DAO = TCCFenceStoreDataBaseDAO.getInstance(); + + private static DataSource dataSource; + + + private static final int MAX_THREAD_CLEAN = 1; + + private static final int MAX_QUEUE_SIZE = 500; + + private static final LinkedBlockingQueue LOG_QUEUE = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); + + private static FenceLogCleanRunnable fenceLogCleanRunnable; + + private static ExecutorService logCleanExecutor; + + static { + try { + initLogCleanExecutor(); + } catch (Exception e) { + LOGGER.error("init fence log clean executor error", e); + } + } + + /** + * tcc prepare method enhanced + * + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param actionName the action name + * @return the boolean + */ + public static Object prepareFence(String xid, Long branchId, String actionName) { + DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); + DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); + IAtom runnable = () -> { + Connection connection = dataSource.getConnection(); + boolean result = insertTCCFenceLog(connection, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED); + LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId); + if (!result) { + throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.InsertRecordError); + } + return result; + }; + DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); + return dbPro.tx(runnable); + } + + /** + * tcc commit method enhanced + * + * @param commitMethod commit method + * @param targetTCCBean target tcc bean + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param args commit method's parameters + * @return the boolean + */ + public static boolean commitFence(Method commitMethod, Object targetTCCBean, + String xid, Long branchId, Object[] args) { + + DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); + DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); + IAtom runnable = () -> { + Connection connection = dataSource.getConnection(); + TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(connection, xid, branchId); + if (tccFenceDO == null){ + throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.InsertRecordError); + } + if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) { + LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); + return true; + } + if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); + } + return false; + } + try { + return updateStatusAndInvokeTargetMethod(connection, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, args); + } catch (Exception ex) { + throw new TCCFenceException(ex.getCause()); + } + }; + DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); + return dbPro.tx(runnable); + } + + /** + * tcc rollback method enhanced + * + * @param rollbackMethod rollback method + * @param targetTCCBean target tcc bean + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param args rollback method's parameters + * @param actionName the action name + * @return the boolean + */ + public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean, + String xid, Long branchId, Object[] args, String actionName) { + DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); + DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); + IAtom runnable = () -> { + try { + Connection connection = dataSource.getConnection(); + TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(connection, xid, branchId); + if (tccFenceDO == null){ + boolean result = insertTCCFenceLog(connection, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED); + LOGGER.info("Insert tcc fence record result: {}. xid: {}, branchId: {}", result, xid, branchId); + if (!result) { + throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.InsertRecordError); + } + return true; + } else { + if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) { + LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); + return true; + } + if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); + } + return false; + } + } + return updateStatusAndInvokeTargetMethod(connection, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, args); + } catch (Throwable ex) { + throw new TCCFenceException(ex.getCause()); + } + }; + DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); + return dbPro.tx(Connection.TRANSACTION_READ_UNCOMMITTED, runnable); + } + + /** + * Insert TCC fence log + * + * @param conn the db connection + * @param xid the xid + * @param branchId the branchId + * @param status the status + * @return the boolean + */ + private static boolean insertTCCFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) { + TCCFenceDO tccFenceDO = new TCCFenceDO(); + tccFenceDO.setXid(xid); + tccFenceDO.setBranchId(branchId); + tccFenceDO.setActionName(actionName); + tccFenceDO.setStatus(status); + return TCC_FENCE_DAO.insertTCCFenceDO(conn, tccFenceDO); + } + + /** + * Update TCC Fence status and invoke target method + * + * @param method target method + * @param targetTCCBean target bean + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param status the tcc fence status + * @return the boolean + */ + private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean, + String xid, Long branchId, int status, Object[] args) throws Exception { + boolean result = TCC_FENCE_DAO.updateTCCFenceDO(conn, xid, branchId, status, TCCFenceConstant.STATUS_TRIED); + if (result) { + // invoke two phase method + Object ret = method.invoke(targetTCCBean, args); + if (null != ret) { + if (ret instanceof TwoPhaseResult) { + result = ((TwoPhaseResult) ret).isSuccess(); + } else { + result = (boolean) ret; + } + } + } + return result; + } + + private static void initLogCleanExecutor() { + logCleanExecutor = new ThreadPoolExecutor(MAX_THREAD_CLEAN, MAX_THREAD_CLEAN, Integer.MAX_VALUE, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new NamedThreadFactory("fenceLogCleanThread", MAX_THREAD_CLEAN, true) + ); + fenceLogCleanRunnable = new FenceLogCleanRunnable(); + logCleanExecutor.submit(fenceLogCleanRunnable); + } + + /** + * Delete TCC Fence + * + * @param xid the global transaction id + * @param branchId the branch transaction id + * @return the boolean + */ + public static boolean deleteFence(String xid, Long branchId) { + + DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); + DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); + IAtom runnable = () -> { + try { + Connection connection = dataSource.getConnection(); + boolean ret = TCC_FENCE_DAO.deleteTCCFenceDO(connection, xid, branchId); + return ret; + } catch (Throwable ex) { + return false; + } + }; + DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); + return dbPro.tx(Connection.TRANSACTION_READ_UNCOMMITTED, runnable); + } + + private static void addToLogCleanQueue(final String xid, final long branchId) { + FenceLogIdentity logIdentity = new FenceLogIdentity(); + logIdentity.setXid(xid); + logIdentity.setBranchId(branchId); + try { + LOG_QUEUE.add(logIdentity); + } catch (Exception e) { + LOGGER.warn("Insert tcc fence record into queue for async delete error,xid:{},branchId:{}", xid, branchId, e); + } + } + + /** + * clean fence log that has the final status runnable. + * + * @see TCCFenceConstant + */ + private static class FenceLogCleanRunnable implements Runnable { + @Override + public void run() { + while (true) { + try { + FenceLogIdentity logIdentity = LOG_QUEUE.take(); + boolean ret = deleteFence(logIdentity.getXid(), logIdentity.getBranchId()); + if (!ret) { + LOGGER.error("delete fence log failed, xid: {}, branchId: {}", logIdentity.getXid(), logIdentity.getBranchId()); + } + } catch (InterruptedException e) { + LOGGER.error("take fence log from queue for clean be interrupted", e); + } catch (Exception e) { + LOGGER.error("exception occur when clean fence log", e); + } + } + } + } + + private static class FenceLogIdentity { + /** + * the global transaction id + */ + private String xid; + + /** + * the branch transaction id + */ + private Long branchId; + + public String getXid() { + return xid; + } + + public Long getBranchId() { + return branchId; + } + + public void setXid(String xid) { + this.xid = xid; + } + + public void setBranchId(Long branchId) { + this.branchId = branchId; + } + } +} diff --git a/src/main/resources/META-INF/seata/io.seata.core.model.ResourceManager b/src/main/resources/META-INF/seata/io.seata.core.model.ResourceManager new file mode 100644 index 000000000..e6494fdd8 --- /dev/null +++ b/src/main/resources/META-INF/seata/io.seata.core.model.ResourceManager @@ -0,0 +1 @@ +io.jboot.support.seata.tcc.JbootTCCResourceManager \ No newline at end of file diff --git a/src/test/java/io/jboot/test/seata/commons/fescar_.sql b/src/test/java/io/jboot/test/seata/commons/fescar_.sql index 07c6c16ec..4e37b9063 100644 --- a/src/test/java/io/jboot/test/seata/commons/fescar_.sql +++ b/src/test/java/io/jboot/test/seata/commons/fescar_.sql @@ -49,3 +49,15 @@ CREATE TABLE `undo_log` ( PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; + +CREATE TABLE `tcc_fence_log` ( + `xid` varchar(128) NOT NULL COMMENT 'global id', + `branch_id` bigint NOT NULL COMMENT 'branch id', + `action_name` varchar(64) NOT NULL COMMENT 'action name', + `status` tinyint NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)', + `gmt_create` datetime(3) NOT NULL COMMENT 'create time', + `gmt_modified` datetime(3) NOT NULL COMMENT 'update time', + PRIMARY KEY (`xid`,`branch_id`), + KEY `idx_gmt_modified` (`gmt_modified`), + KEY `idx_status` (`status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; \ No newline at end of file diff --git a/src/test/java/io/jboot/test/seata/service/impl/TccActionOneServiceImpl.java b/src/test/java/io/jboot/test/seata/service/impl/TccActionOneServiceImpl.java index 63ca7c4c6..1225d2963 100644 --- a/src/test/java/io/jboot/test/seata/service/impl/TccActionOneServiceImpl.java +++ b/src/test/java/io/jboot/test/seata/service/impl/TccActionOneServiceImpl.java @@ -22,7 +22,7 @@ public class TccActionOneServiceImpl implements TccActionOneService { private IAccountService accountService; @Override - @TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback") + @TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true) public boolean prepare(BusinessActionContext actionContext, String account,int money, boolean flag) { System.out.println("actionContext获取Xid prepare>>> "+actionContext.getXid()); System.out.println("actionContext获取TCC参数 prepare>>> "+actionContext.getActionContext("account")); diff --git a/src/test/java/io/jboot/test/seata/starter/AccountApplicaiton.java b/src/test/java/io/jboot/test/seata/starter/AccountApplicaiton.java index db3d1034d..0370ba94f 100644 --- a/src/test/java/io/jboot/test/seata/starter/AccountApplicaiton.java +++ b/src/test/java/io/jboot/test/seata/starter/AccountApplicaiton.java @@ -25,7 +25,7 @@ public static void main(String[] args) { JbootApplication.setBootArg("jboot.datasource.type", "mysql"); JbootApplication.setBootArg("jboot.datasource.url", "jdbc:mysql://127.0.0.1:3306/mini?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull"); JbootApplication.setBootArg("jboot.datasource.user", "root"); - JbootApplication.setBootArg("jboot.datasource.password", "zhang123"); + JbootApplication.setBootArg("jboot.datasource.password", "zxn123"); JbootApplication.setBootArg("jboot.model.unscanPackage", "*"); JbootApplication.setBootArg("jboot.model.scanPackage", "io.jboot.test.seata.commons"); diff --git a/src/test/java/io/jboot/test/seata/starter/WebApplication.java b/src/test/java/io/jboot/test/seata/starter/WebApplication.java index d8642529d..fa16a14b4 100644 --- a/src/test/java/io/jboot/test/seata/starter/WebApplication.java +++ b/src/test/java/io/jboot/test/seata/starter/WebApplication.java @@ -33,7 +33,7 @@ public static void main(String[] args) { JbootApplication.setBootArg("jboot.datasource.type", "mysql"); JbootApplication.setBootArg("jboot.datasource.url", "jdbc:mysql://127.0.0.1:3306/mini?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull"); JbootApplication.setBootArg("jboot.datasource.user", "root"); - JbootApplication.setBootArg("jboot.datasource.password", "zhang123"); + JbootApplication.setBootArg("jboot.datasource.password", "zxn123"); JbootApplication.setBootArg("jboot.model.unscanPackage", "*"); JbootApplication.setBootArg("jboot.model.scanPackage", "io.jboot.test.seata.commons");