Skip to content

Commit

Permalink
!82 seata版本升级以及TCC模式代码修改优化
Browse files Browse the repository at this point in the history
Merge pull request !82 from 扫地猿/master
  • Loading branch information
yangfuhai authored and gitee-org committed Jun 12, 2022
2 parents 8237a78 + 062bf81 commit 42d67d7
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<sentinel.version>1.8.3</sentinel.version>
<dubbo.version>2.7.15</dubbo.version>
<resteasy.version>3.13.2.Final</resteasy.version>
<seata.version>1.4.2</seata.version>
<seata.version>1.5.1</seata.version>
<motan.version>1.1.8</motan.version>
<metrics.version>4.2.9</metrics.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public TransactionInfo getTransactionInfo() {
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Object execute() throws Throwable {
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
config.setLockRetryInterval(globalLockAnno.lockRetryInternal());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.tcc.TCCResource;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.BusinessActionContextUtil;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import io.seata.rm.tcc.interceptor.ActionContextUtil;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.HashMap;
Expand All @@ -40,7 +44,6 @@
* @author zhangsen/菜农 commit: https://gitee.com/fuhai/jboot/commit/55564bfd9e6eebfc39263291d89592cd16f77498
*/
public class ActionInterceptorHandler {

private static final Log LOGGER = Log.getLog(TccActionInterceptor.class);

/**
Expand All @@ -50,10 +53,9 @@ public class ActionInterceptorHandler {
* @param arguments the arguments
* @param businessAction the business action
* @return map map
* @throws Throwable the throwable
*/
public void proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Invocation invocation) {
Invocation invocation) {

//TCC name
String actionName = businessAction.name();
Expand All @@ -79,8 +81,30 @@ public void proceed(Method method, Object[] arguments, String xid, TwoPhaseBusin
e.printStackTrace();
}
actionContext.setBranchId(branchId);

invocation.invoke();
// save the previous action context
BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
try {
//share actionContext implicitly
BusinessActionContextUtil.setContext(actionContext);
if (businessAction.useTCCFence()) {
// Use TCC Fence, and return the business result
TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName);
}
invocation.invoke();
} finally {
try {
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
} finally {
if (previousActionContext != null) {
// recovery the previous action context
BusinessActionContextUtil.setContext(previousActionContext);
} else {
// clear the action context
BusinessActionContextUtil.clear();
}
}
}
}

/**
Expand Down Expand Up @@ -153,6 +177,7 @@ protected void initBusinessContext(Map<String, Object> context, Method method,
context.put(Constants.COMMIT_METHOD, businessAction.commitMethod());
context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod());
context.put(Constants.ACTION_NAME, businessAction.name());
context.put(Constants.USE_TCC_FENCE, businessAction.useTCCFence());
}
}

Expand Down Expand Up @@ -193,10 +218,42 @@ public void registryResource(Method m, Object interfaceClass, Class[] argument
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass.getClass(), businessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
// set argsClasses
tccResource.setCommitArgsClasses(businessAction.commitArgsClasses());
tccResource.setRollbackArgsClasses(businessAction.rollbackArgsClasses());
// set phase two method's keys
tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),
businessAction.commitArgsClasses()));
tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),
businessAction.rollbackArgsClasses()));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}


protected String[] getTwoPhaseArgs(Method method, Class<?>[] argsClasses) {
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
String[] keys = new String[parameterAnnotations.length];
/*
* get parameter's key
* if method's parameter list is like
* (BusinessActionContext, @BusinessActionContextParameter("a") A a, @BusinessActionContextParameter("b") B b)
* the keys will be [null, a, b]
*/
for (int i = 0; i < parameterAnnotations.length; i++) {
for (int j = 0; j < parameterAnnotations[i].length; j++) {
if (parameterAnnotations[i][j] instanceof BusinessActionContextParameter) {
BusinessActionContextParameter param = (BusinessActionContextParameter)parameterAnnotations[i][j];
String key = ActionContextUtil.getParamNameFromAnnotation(param);
keys[i] = key;
break;
}
}
if (keys[i] == null && !(argsClasses[i].equals(BusinessActionContext.class))) {
throw new IllegalArgumentException("non-BusinessActionContext parameter should use annotation " +
"BusinessActionContextParameter");
}
}
return keys;
}
}
242 changes: 242 additions & 0 deletions src/main/java/io/jboot/support/seata/tcc/JbootTCCResourceManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package io.jboot.support.seata.tcc;

import com.alibaba.fastjson.JSON;
import io.seata.common.Constants;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.exception.SkipCallbackWrapperException;
import io.seata.common.util.StringUtils;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType;
import io.seata.core.model.Resource;
import io.seata.rm.AbstractResourceManager;
import io.seata.rm.tcc.TCCResource;
import io.seata.rm.tcc.TwoPhaseResult;
import io.seata.rm.tcc.api.BusinessActionContext;

import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author zhangxn
*/
public class JbootTCCResourceManager extends AbstractResourceManager {

/**
* TCC resource cache
*/
private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<>();

/**
* Instantiates a new Tcc resource manager.
*/
public JbootTCCResourceManager() {
// not do anything
}

/**
* registry TCC resource
*
* @param resource The resource to be managed.
*/
@Override
public void registerResource(Resource resource) {
TCCResource tccResource = (TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(), tccResource);
super.registerResource(tccResource);
}

@Override
public Map<String, Resource> getManagedResources() {
return tccResourceCache;
}

/**
* TCC branch commit
*
* @param branchType
* @param xid Transaction id.
* @param branchId Branch id.
* @param resourceId Resource id.
* @param applicationData Application data bind with this branch.
* @return BranchStatus
* @throws TransactionException TransactionException
*/
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
ret = commitMethod.invoke(targetTCCBean, null);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}

/**
* TCC branch rollback
*
* @param branchType the branch type
* @param xid Transaction id.
* @param branchId Branch id.
* @param resourceId Resource id.
* @param applicationData Application data bind with this branch.
* @return BranchStatus
* @throws TransactionException TransactionException
*/
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method rollbackMethod = tccResource.getRollbackMethod();
if (targetTCCBean == null || rollbackMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);

Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
args, tccResource.getActionName());
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
ret = rollbackMethod.invoke(targetTCCBean,args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}

/**
* transfer tcc applicationData to BusinessActionContext
*
* @param xid the xid
* @param branchId the branch id
* @param resourceId the resource id
* @param applicationData the application data
* @return business action context
*/
protected BusinessActionContext getBusinessActionContext(String xid, long branchId, String resourceId,
String applicationData) {
Map actionContextMap = null;
if (StringUtils.isNotBlank(applicationData)) {
Map tccContext = JSON.parseObject(applicationData, Map.class);
actionContextMap = (Map)tccContext.get(Constants.TCC_ACTION_CONTEXT);
}
if (actionContextMap == null) {
actionContextMap = new HashMap<>(2);
}

//instance the action context
BusinessActionContext businessActionContext = new BusinessActionContext(
xid, String.valueOf(branchId), actionContextMap);
businessActionContext.setActionName(resourceId);
return businessActionContext;
}

/**
* get phase two commit method's args
* @param tccResource tccResource
* @param businessActionContext businessActionContext
* @return args
*/
private Object[] getTwoPhaseCommitArgs(TCCResource tccResource, BusinessActionContext businessActionContext) {
String[] keys = tccResource.getPhaseTwoCommitKeys();
Class<?>[] argsCommitClasses = tccResource.getCommitArgsClasses();
return this.getTwoPhaseMethodParams(keys, argsCommitClasses, businessActionContext);
}

/**
* get phase two rollback method's args
* @param tccResource tccResource
* @param businessActionContext businessActionContext
* @return args
*/
private Object[] getTwoPhaseRollbackArgs(TCCResource tccResource, BusinessActionContext businessActionContext) {
String[] keys = tccResource.getPhaseTwoRollbackKeys();
Class<?>[] argsRollbackClasses = tccResource.getRollbackArgsClasses();
return this.getTwoPhaseMethodParams(keys, argsRollbackClasses, businessActionContext);
}

private Object[] getTwoPhaseMethodParams(String[] keys, Class<?>[] argsClasses, BusinessActionContext businessActionContext) {
Object[] args = new Object[argsClasses.length];
for (int i = 0; i < argsClasses.length; i++) {
if (argsClasses[i].equals(BusinessActionContext.class)) {
args[i] = businessActionContext;
} else {
args[i] = businessActionContext.getActionContext(keys[i], argsClasses[i]);
}
}
return args;
}

@Override
public BranchType getBranchType() {
return BranchType.TCC;
}
}
Loading

0 comments on commit 42d67d7

Please sign in to comment.