diff --git a/README.md b/README.md index 5d55ed9..6db5586 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ Eagle是一个分布式的RPC框架,支持灵活的配置,支持分布式追 > * cd eagle-benchmark > * mvn clean install > * cd eagle-benchmark-server/target -> * tar -zxvf eagle-benchmark-server-1.5-assembly.tar.gz -> * cd eagle-benchmark-server-1.5 +> * tar -zxvf eagle-benchmark-server-1.6-assembly.tar.gz +> * cd eagle-benchmark-server-1.6 > * bin/start.sh > * cd eagle-benchmark/eagle-benchmark-client > * 在linux上运行 sh benchmark.sh,在window上运行 benchmark.cmd @@ -44,22 +44,22 @@ Eagle是一个分布式的RPC框架,支持灵活的配置,支持分布式追 org.jfaster.eagle eagle-core - 1.5 + 1.6 org.jfaster.eagle eagle-registry-zookeeper - 1.5 + 1.6 org.jfaster.eagle eagle-transport-netty - 1.5 + 1.6 org.jfaster.eagle eagle-spring-support - 1.5 + 1.6 ``` 如果是springBoot,添加如下: @@ -67,22 +67,22 @@ Eagle是一个分布式的RPC框架,支持灵活的配置,支持分布式追 org.jfaster.eagle eagle-core - 1.5 + 1.6 org.jfaster.eagle eagle-registry-zookeeper - 1.5 + 1.6 org.jfaster.eagle eagle-transport-netty - 1.5 + 1.6 org.jfaster.eagle spring-boot-starter-eagle - 1.5 + 1.6 ``` ## 分布式调用追踪 @@ -843,8 +843,8 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%d [%T] %-5p %c{1}:%L - %m%n # 后台管理界面 > eagle 提供可视化的后台管理,方便查看和修改配置。 > 启动后台的步骤 - * tar -zxvf eagle-ui-1.5.tar.gz - * cd eagle-ui-1.5 + * tar -zxvf eagle-ui-1.6.tar.gz + * cd eagle-ui-1.6 * vim conf/eagle.conf 修改用户名、密码、jvm参数、日志路径、端口号等 * sh bin/eagle.sh start diff --git a/eagle-benchmark/eagle-benchmark-api/pom.xml b/eagle-benchmark/eagle-benchmark-api/pom.xml index 11a4d77..b04b15f 100644 --- a/eagle-benchmark/eagle-benchmark-api/pom.xml +++ b/eagle-benchmark/eagle-benchmark-api/pom.xml @@ -3,7 +3,7 @@ eagle-benchmark org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-benchmark/eagle-benchmark-client/pom.xml b/eagle-benchmark/eagle-benchmark-client/pom.xml index a0e177f..9f22dcd 100644 --- a/eagle-benchmark/eagle-benchmark-client/pom.xml +++ b/eagle-benchmark/eagle-benchmark-client/pom.xml @@ -3,7 +3,7 @@ eagle-benchmark org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-benchmark/eagle-benchmark-server/pom.xml b/eagle-benchmark/eagle-benchmark-server/pom.xml index 917bdf6..164ff19 100644 --- a/eagle-benchmark/eagle-benchmark-server/pom.xml +++ b/eagle-benchmark/eagle-benchmark-server/pom.xml @@ -3,7 +3,7 @@ eagle-benchmark org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-benchmark/pom.xml b/eagle-benchmark/pom.xml index 5250ae2..c28975b 100644 --- a/eagle-benchmark/pom.xml +++ b/eagle-benchmark/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.jfaster.eagle - 1.5 + 1.6 eagle-benchmark pom diff --git a/eagle-core/pom.xml b/eagle-core/pom.xml index daa29c3..b0b11a6 100755 --- a/eagle-core/pom.xml +++ b/eagle-core/pom.xml @@ -3,7 +3,7 @@ eagle org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-core/src/main/java/eagle/jfaster/org/cluster/cluster/EagleReferCluster.java b/eagle-core/src/main/java/eagle/jfaster/org/cluster/cluster/EagleReferCluster.java index 369c095..5e5df36 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/cluster/cluster/EagleReferCluster.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/cluster/cluster/EagleReferCluster.java @@ -21,7 +21,6 @@ import eagle.jfaster.org.cluster.LoadBalance; import eagle.jfaster.org.cluster.ReferCluster; import eagle.jfaster.org.config.common.MergeConfig; -import eagle.jfaster.org.exception.EagleFrameException; import eagle.jfaster.org.exception.MockException; import eagle.jfaster.org.logging.InternalLogger; import eagle.jfaster.org.logging.InternalLoggerFactory; diff --git a/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AbstractReferInvokeHandler.java b/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AbstractReferInvokeHandler.java index 86c8835..0354deb 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AbstractReferInvokeHandler.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AbstractReferInvokeHandler.java @@ -91,12 +91,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } EagleRequest request = new EagleRequest(); request.setInterfaceName(interfaceName); - String opaque = TraceContext.getOpaque(); - if(Strings.isNullOrEmpty(opaque)){ - opaque = OpaqueGenerator.getDistributeOpaque(); - TraceContext.setOpaque(opaque); + String traceId = TraceContext.getTraceId(); + if(!Strings.isNullOrEmpty(traceId)){ + request.setAttachment(TraceContext.TRACE_KEY,traceId); } - request.setOpaque(opaque); + request.setOpaque(OpaqueGenerator.getOpaque()); request.setParameters(args); request.setMethodName(method.getName()); request.setNeedCompress(compress); diff --git a/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AsyncInvokeHandler.java b/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AsyncInvokeHandler.java index 1173d59..82a9b9b 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AsyncInvokeHandler.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/cluster/proxy/AsyncInvokeHandler.java @@ -22,9 +22,7 @@ import eagle.jfaster.org.util.ReflectUtil; import java.lang.reflect.Method; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Created by fangyanpeng1 on 2017/8/7. diff --git a/eagle-core/src/main/java/eagle/jfaster/org/codec/Codec.java b/eagle-core/src/main/java/eagle/jfaster/org/codec/Codec.java index 9fc07ed..089e45d 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/codec/Codec.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/codec/Codec.java @@ -31,5 +31,5 @@ public interface Codec { ByteBuffer encode(Object message,Serialization serialization) throws IOException; - Object decode(ByteBuffer buffer,Serialization serialization,String opaque,short magicCode) throws IOException; + Object decode(ByteBuffer buffer,Serialization serialization,int opaque,short magicCode) throws IOException; } diff --git a/eagle-core/src/main/java/eagle/jfaster/org/codec/support/EagleCodec.java b/eagle-core/src/main/java/eagle/jfaster/org/codec/support/EagleCodec.java index 0ddcbde..abdb843 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/codec/support/EagleCodec.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/codec/support/EagleCodec.java @@ -40,7 +40,7 @@ /** * 编解码器 * - * 请求协议:magicCode(2个字节) + opaque(32个字节) + * 请求协议:magicCode(2个字节) + opaque(4个字节) * + interfaceName长度(2个字节) +interfaceName * + methodName长度(2个字节) + methodName * + parameterDesc长度(4个字节) + parameterDesc @@ -83,7 +83,7 @@ private ByteBuffer encodeResponse(Response response,Serialization serialization) if(response.isNeedCompress()){ magicCode |= EAGLE_COMPRESS_TYPE; } - int dataLen = 38;//totalLen+magic+opaque 4+2+32 + int dataLen = 10;//totalLen+magic+opaque 4+2+4 ByteBuffer content; if(response.getException() != null){ content = encodeResponseCommon(response.getException(),dataLen,magicCode,response.getOpaque(),serialization,false,EAGLE_RESPONSE_EXCEPTION); @@ -93,14 +93,14 @@ private ByteBuffer encodeResponse(Response response,Serialization serialization) content = ByteBuffer.allocate(dataLen); content.putInt(dataLen); content.putShort(magicCode); - content.put(response.getOpaque().getBytes()); + content.putInt(response.getOpaque()); } content.flip(); return content; } - private ByteBuffer encodeResponseCommon(Object res,int dataLen,short magicCode,String opaque,Serialization serialization,boolean compress,short flag) throws IOException { + private ByteBuffer encodeResponseCommon(Object res,int dataLen,short magicCode,int opaque,Serialization serialization,boolean compress,short flag) throws IOException { magicCode |= flag; String className = res.getClass().getName(); byte[] nameData = className.getBytes(CHARSET_UTF8); @@ -115,7 +115,7 @@ private ByteBuffer encodeResponseCommon(Object res,int dataLen,short magicCode,S ByteBuffer content = ByteBuffer.allocate(dataLen); content.putInt(dataLen); content.putShort(magicCode); - content.put(opaque.getBytes(CHARSET_UTF8)); + content.putInt(opaque); content.putShort((short) nameData.length); content.put(nameData); content.putInt(valData.length); @@ -132,7 +132,7 @@ private ByteBuffer encodeRequest(Request request,Serialization serialization) th if(request.isNeedCompress()){ magicCode |= EAGLE_COMPRESS_TYPE; } - int dataLen = 42;//totalLen+magic+opaque+attachments 4+2+32+4 + int dataLen = 14;//totalLen+magic+opaque+attachments 4+2+4+4 String interfaceName = request.getInterfaceName(); byte[] iNameData = interfaceName.getBytes(CHARSET_UTF8); dataLen += iNameData.length + 2; @@ -181,11 +181,11 @@ private ByteBuffer encodeRequest(Request request,Serialization serialization) th return content; } - private ByteBuffer encodeReqCommon(int dataLen,short magicCode,String opaque,byte[] iNameData,byte[] mNameData){ + private ByteBuffer encodeReqCommon(int dataLen,short magicCode,int opaque,byte[] iNameData,byte[] mNameData){ ByteBuffer content = ByteBuffer.allocate(dataLen); content.putInt(dataLen); content.putShort(magicCode); - content.put(opaque.getBytes(CHARSET_UTF8)); + content.putInt(opaque); content.putShort((short) iNameData.length); content.put(iNameData); content.putShort((short) mNameData.length); @@ -230,7 +230,7 @@ public static byte[] encodeAttachments(Map map) { } @Override - public Object decode(ByteBuffer buffer, Serialization serialization, String opaque, short magicCode) throws IOException { + public Object decode(ByteBuffer buffer, Serialization serialization, int opaque, short magicCode) throws IOException { try { if(isRequest(magicCode)){ return decodeRequest(buffer,serialization,opaque,magicCode); @@ -242,7 +242,7 @@ public Object decode(ByteBuffer buffer, Serialization serialization, String opaq } } - private Object decodeRequest(ByteBuffer buffer,Serialization serialization,String opaque,short magicCode) + private Object decodeRequest(ByteBuffer buffer,Serialization serialization,int opaque,short magicCode) throws IOException, ClassNotFoundException { EagleRequest request = new EagleRequest(); request.setOpaque(opaque); @@ -318,7 +318,7 @@ private Map decodeRequestAttachments(ByteBuffer buffer) throws I return attachments; } - public Object decodeResponse(ByteBuffer buffer, Serialization serialization, String opaque, short magicCode) + public Object decodeResponse(ByteBuffer buffer, Serialization serialization, int opaque, short magicCode) throws ClassNotFoundException, IOException { EagleResponse response = new EagleResponse(); response.setOpaque(opaque); diff --git a/eagle-core/src/main/java/eagle/jfaster/org/rpc/Request.java b/eagle-core/src/main/java/eagle/jfaster/org/rpc/Request.java index 59ae214..a2c29b3 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/rpc/Request.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/rpc/Request.java @@ -24,7 +24,7 @@ */ public interface Request { - String getOpaque(); + int getOpaque(); String getInterfaceName(); diff --git a/eagle-core/src/main/java/eagle/jfaster/org/rpc/Response.java b/eagle-core/src/main/java/eagle/jfaster/org/rpc/Response.java index 069e28a..9ce6cc6 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/rpc/Response.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/rpc/Response.java @@ -32,7 +32,7 @@ public interface Response { boolean isNeedCompress(); //请求的唯一标识 - String getOpaque(); + int getOpaque(); //附加信息 Map getAttachments(); diff --git a/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleRequest.java b/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleRequest.java index 8e70acd..fa7196e 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleRequest.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleRequest.java @@ -31,7 +31,7 @@ public class EagleRequest implements Request { @Setter - private String opaque; + private int opaque; @Setter private String interfaceName; @@ -52,7 +52,7 @@ public class EagleRequest implements Request { private Map attachments; @Override - public String getOpaque() { + public int getOpaque() { return opaque; } diff --git a/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleResponse.java b/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleResponse.java index 7fef2c6..170e722 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleResponse.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/EagleResponse.java @@ -37,7 +37,7 @@ public class EagleResponse implements Response { private Exception exception; @Setter - private String opaque; + private int opaque; @Setter private boolean needCompress; @@ -61,7 +61,7 @@ public boolean isNeedCompress() { } @Override - public String getOpaque() { + public int getOpaque() { return opaque; } diff --git a/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/TraceContext.java b/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/TraceContext.java index 923fb73..e18985e 100644 --- a/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/TraceContext.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/rpc/support/TraceContext.java @@ -21,18 +21,20 @@ */ public class TraceContext { - private static InheritableThreadLocal opaqueManager = new InheritableThreadLocal<>(); + public static String TRACE_KEY="traceId"; - public static String getOpaque(){ - return opaqueManager.get(); + private static InheritableThreadLocal traceIdManager = new InheritableThreadLocal<>(); + + public static String getTraceId(){ + return traceIdManager.get(); } - public static void setOpaque(String opaque){ - opaqueManager.set(opaque); + public static void setTraceId(String traceId){ + traceIdManager.set(traceId); } public static void clear(){ - opaqueManager.remove(); + traceIdManager.remove(); } diff --git a/eagle-core/src/main/java/eagle/jfaster/org/trace/log4j/TraceIdPatternConverter.java b/eagle-core/src/main/java/eagle/jfaster/org/trace/log4j/TraceIdPatternConverter.java index 4750ac0..e8770f2 100644 --- a/eagle-core/src/main/java/eagle/jfaster/org/trace/log4j/TraceIdPatternConverter.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/trace/log4j/TraceIdPatternConverter.java @@ -29,6 +29,6 @@ public class TraceIdPatternConverter extends PatternConverter { @Override protected String convert(LoggingEvent loggingEvent) { - return Strings.isNullOrEmpty(TraceContext.getOpaque()) ? "N/A" : TraceContext.getOpaque(); + return Strings.isNullOrEmpty(TraceContext.getTraceId()) ? "N/A" : TraceContext.getTraceId(); } } diff --git a/eagle-core/src/main/java/eagle/jfaster/org/trace/logback/LogbackPatternConverter.java b/eagle-core/src/main/java/eagle/jfaster/org/trace/logback/LogbackPatternConverter.java index 79ab568..3524b95 100644 --- a/eagle-core/src/main/java/eagle/jfaster/org/trace/logback/LogbackPatternConverter.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/trace/logback/LogbackPatternConverter.java @@ -27,6 +27,6 @@ public class LogbackPatternConverter extends ClassicConverter { @Override public String convert(ILoggingEvent iLoggingEvent) { - return Strings.isNullOrEmpty(TraceContext.getOpaque()) ? "N/A" : TraceContext.getOpaque(); + return Strings.isNullOrEmpty(TraceContext.getTraceId()) ? "N/A" : TraceContext.getTraceId(); } } diff --git a/eagle-core/src/main/java/eagle/jfaster/org/transport/support/EagleHeartBeatFactory.java b/eagle-core/src/main/java/eagle/jfaster/org/transport/support/EagleHeartBeatFactory.java index 42b07fa..c833be6 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/transport/support/EagleHeartBeatFactory.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/transport/support/EagleHeartBeatFactory.java @@ -39,7 +39,7 @@ public class EagleHeartBeatFactory implements HeartBeatFactory { @Override public Request createRequest() { EagleRequest request = new EagleRequest(); - request.setOpaque(OpaqueGenerator.getDistributeOpaque()); + request.setOpaque(OpaqueGenerator.getOpaque()); request.setInterfaceName(HEARTBEAT_INTERFACE_NAME); request.setMethodName(HEARTBEAT_METHOD_NAME); return request; diff --git a/eagle-core/src/main/java/eagle/jfaster/org/util/RequestUtil.java b/eagle-core/src/main/java/eagle/jfaster/org/util/RequestUtil.java index ff8a42e..a5320f2 100755 --- a/eagle-core/src/main/java/eagle/jfaster/org/util/RequestUtil.java +++ b/eagle-core/src/main/java/eagle/jfaster/org/util/RequestUtil.java @@ -66,7 +66,7 @@ public static boolean isVoidValue(short magicCode){ return (magicCode & EAGLE_RESPONSE_TYPE) == EAGLE_RESPONSE_VOID; } - public static EagleResponse buildExceptionResponse(String opaque,Exception e){ + public static EagleResponse buildExceptionResponse(int opaque,Exception e){ EagleResponse response = new EagleResponse(); response.setOpaque(opaque); response.setException(e); diff --git a/eagle-example/eagle-example-spring/pom.xml b/eagle-example/eagle-example-spring/pom.xml index ed8066c..565b09f 100755 --- a/eagle-example/eagle-example-spring/pom.xml +++ b/eagle-example/eagle-example-spring/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.jfaster.eagle eagle-example-spring - 1.5 + 1.6 jar eagle-example-spring diff --git a/eagle-example/eagle-example-springboot/pom.xml b/eagle-example/eagle-example-springboot/pom.xml index 78136ec..2e34d72 100755 --- a/eagle-example/eagle-example-springboot/pom.xml +++ b/eagle-example/eagle-example-springboot/pom.xml @@ -9,7 +9,7 @@ org.jfaster.eagle eagle-example-springboot - 1.5 + 1.6 jar eagle-example-springboot diff --git a/eagle-example/eagle-example-springmvc/pom.xml b/eagle-example/eagle-example-springmvc/pom.xml index e79d7f8..d9222b8 100644 --- a/eagle-example/eagle-example-springmvc/pom.xml +++ b/eagle-example/eagle-example-springmvc/pom.xml @@ -3,7 +3,7 @@ eagle-example org.jfaster.eagle - 1.5 + 1.6 4.0.0 eagle-example-springmvc diff --git a/eagle-example/pom.xml b/eagle-example/pom.xml index 89e79ed..c08a6c9 100755 --- a/eagle-example/pom.xml +++ b/eagle-example/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.jfaster.eagle eagle-example - 1.5 + 1.6 eagle-example-spring eagle-example-springboot diff --git a/eagle-registry-zookeeper/pom.xml b/eagle-registry-zookeeper/pom.xml index c871e2b..1fb4534 100755 --- a/eagle-registry-zookeeper/pom.xml +++ b/eagle-registry-zookeeper/pom.xml @@ -3,7 +3,7 @@ eagle org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-spring-support/pom.xml b/eagle-spring-support/pom.xml index 6e74499..aac0c1d 100755 --- a/eagle-spring-support/pom.xml +++ b/eagle-spring-support/pom.xml @@ -3,7 +3,7 @@ eagle org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceCglibProxy.java b/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceCglibProxy.java index a79f423..15e02f5 100644 --- a/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceCglibProxy.java +++ b/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceCglibProxy.java @@ -415,10 +415,10 @@ public Object intercept(Object proxy, Method method, Object[] args, MethodProxy } Object retVal; if(EagleTraceMethodRecods.needTrace(method,targetClass)){ - boolean clear = Strings.isNullOrEmpty(TraceContext.getOpaque()); + boolean clear = Strings.isNullOrEmpty(TraceContext.getTraceId()); try { if(clear){ - TraceContext.setOpaque(OpaqueGenerator.getDistributeOpaque()); + TraceContext.setTraceId(OpaqueGenerator.getDistributeOpaque()); } retVal = methodProxy.invoke(target, args); } catch (Throwable e){ diff --git a/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceJdkProxy.java b/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceJdkProxy.java index d1beb23..a24b550 100644 --- a/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceJdkProxy.java +++ b/eagle-spring-support/src/main/java/eagle/jfaster/org/bean/trace/EagleTraceJdkProxy.java @@ -112,10 +112,10 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } if(EagleTraceMethodRecods.needTrace(method,targetClass)){ - boolean clear = Strings.isNullOrEmpty(TraceContext.getOpaque()); + boolean clear = Strings.isNullOrEmpty(TraceContext.getTraceId()); try { if(clear){ - TraceContext.setOpaque(OpaqueGenerator.getDistributeOpaque()); + TraceContext.setTraceId(OpaqueGenerator.getDistributeOpaque()); } retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args); } catch (Throwable e){ diff --git a/eagle-transport-netty/pom.xml b/eagle-transport-netty/pom.xml index 0a3a54a..d966a7d 100755 --- a/eagle-transport-netty/pom.xml +++ b/eagle-transport-netty/pom.xml @@ -3,7 +3,7 @@ eagle org.jfaster.eagle - 1.5 + 1.6 4.0.0 diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyClient.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyClient.java index d26c386..3941544 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyClient.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyClient.java @@ -100,7 +100,7 @@ public class NettyClient implements Client,StatisticCallback { private AtomicLong errorCount = new AtomicLong(0); @Getter - private Map callbackMap = new ConcurrentHashMap(256); + private Map callbackMap = new ConcurrentHashMap(256); private ScheduledFuture asyncCallbackMonitorFuture = null; @@ -187,11 +187,11 @@ public AbstractNettyChannel newChannel() throws InterruptedException { return this.callBack == null ? new SyncNettyChannel(this,channel) : new AsyncNettyChannel(this,channel); } - public NettyResponseFuture removeCallBack(String opaque){ + public NettyResponseFuture removeCallBack(Integer opaque){ return callbackMap.remove(opaque); } - public void addCallBack(String opaque,NettyResponseFuture future){ + public void addCallBack(Integer opaque,NettyResponseFuture future){ callbackMap.put(opaque,future); } @@ -208,8 +208,11 @@ public void executeInvokeCallback(final ResponseFuture responseFuture) { runInThisThread = true; } if (runInThisThread) { - TraceContext.setOpaque(((NettyResponseFuture)responseFuture).getOpaque()); try { + Map attachments = ((NettyResponseFuture)responseFuture).getAttachments(); + if(attachments != null){ + TraceContext.setTraceId(attachments.get(TraceContext.TRACE_KEY)); + } responseFuture.executeCallback(); } catch (Throwable e) { logger.info("executeInvokeCallback Exception", e); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyResponseFuture.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyResponseFuture.java index 6e0a8a4..6c7f0c6 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyResponseFuture.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/NettyResponseFuture.java @@ -22,6 +22,8 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; + +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +38,7 @@ public class NettyResponseFuture implements ResponseFuture { //请求唯一标识 @Getter - private final String opaque; + private final int opaque; //超时时间 @Getter @@ -46,6 +48,9 @@ public class NettyResponseFuture implements ResponseFuture { @Getter private final MethodInvokeCallBack callBack; + @Getter + private final Map attachments; + //请求开始时间 @Getter private final long beginTimestamp = System.currentTimeMillis(); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/channel/AbstractNettyChannel.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/channel/AbstractNettyChannel.java index c25aa0b..7e8818c 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/channel/AbstractNettyChannel.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/channel/AbstractNettyChannel.java @@ -71,8 +71,8 @@ public AbstractNettyChannel(NettyClient client, Channel channel) { public Object request(Request request, NettySharedConnPool connPool) throws Exception { int timeout = config.getExtInt(ConfigEnum.requestTimeout.getName(),ConfigEnum.requestTimeout.getIntValue()); - final String opaque = request.getOpaque(); - final NettyResponseFuture responseFuture = new NettyResponseFuture(opaque,timeout,callBack); + final int opaque = request.getOpaque(); + final NettyResponseFuture responseFuture = new NettyResponseFuture(opaque,timeout,callBack,request.getAttachments()); try { if(timeout < 0){ throw new EagleFrameException("The request timeout of %s is not allowed to set 0",timeout); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/handler/AbstractMessageChannelHandler.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/handler/AbstractMessageChannelHandler.java index f188d27..940952e 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/handler/AbstractMessageChannelHandler.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/client/handler/AbstractMessageChannelHandler.java @@ -37,7 +37,7 @@ public abstract class AbstractMessageChannelHandler extends SimpleChannelInbound @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception { - String opaque = response.getOpaque(); + int opaque = response.getOpaque(); NettyResponseFuture future = client.removeCallBack(opaque); if(future != null){ handle(response,future); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyDecorder.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyDecorder.java index be676c6..f0fd0a4 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyDecorder.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyDecorder.java @@ -67,13 +67,11 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception if(isNotIllegal(magicCode)){ throw new EagleFrameException("Error the type: '%d' is not supported",magicCode); } - byte[] opBytes = new byte[32]; - byteBuffer.get(opBytes); - String opaque = new String(opBytes,CHARSET_UTF8); + int opaque = byteBuffer.getInt(); try { return codec.decode(byteBuffer,serialization,opaque,magicCode); }catch (Throwable e){ - logger.error(String.format("%s Error codec decode ",opaque),e); + logger.error("Error codec decode ",e); EagleResponse response; if(e instanceof EagleFrameException){ response = buildExceptionResponse(opaque,(Exception)e); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyEncoder.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyEncoder.java index 7fc14d3..725f8a2 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyEncoder.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/coder/NettyEncoder.java @@ -17,11 +17,8 @@ package eagle.jfaster.org.coder; -import eagle.jfaster.org.exception.EagleFrameException; import eagle.jfaster.org.logging.InternalLogger; import eagle.jfaster.org.logging.InternalLoggerFactory; -import eagle.jfaster.org.rpc.Request; -import eagle.jfaster.org.rpc.Response; import eagle.jfaster.org.util.ExceptionUtil; import eagle.jfaster.org.util.RemotingUtil; import eagle.jfaster.org.codec.Codec; @@ -53,13 +50,7 @@ protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf byteBuf ByteBuffer data = codec.encode(message,serialization); byteBuf.writeBytes(data); } catch (Throwable e) { - String opaque = null; - if(message instanceof Request){ - opaque = ((Request) message).getOpaque(); - }else if(message instanceof Response){ - opaque = ((Response) message).getOpaque(); - } - logger.error(opaque + " Error encode message "+RemotingUtil.parseChannelRemoteAddr(ctx.channel()),e); + logger.error("Error encode message "+RemotingUtil.parseChannelRemoteAddr(ctx.channel()),e); RemotingUtil.closeChannel(ctx.channel(),"NettyEncoder encode"); throw ExceptionUtil.handleException(e); } diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/server/MessageChannelHandler.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/server/MessageChannelHandler.java index a3af7e7..2b34855 100755 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/server/MessageChannelHandler.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/server/MessageChannelHandler.java @@ -30,6 +30,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.RequiredArgsConstructor; + +import java.util.Map; import java.util.concurrent.RejectedExecutionException; /** @@ -54,8 +56,11 @@ protected void channelRead0(final ChannelHandlerContext ctx, final EagleRequest threadExecutor.execute(new Runnable() { @Override public void run() { - TraceContext.setOpaque(request.getOpaque()); - EagleResponse response= (EagleResponse) invokeRouter.routeAndInvoke(request); + Map attachments = request.getAttachments(); + if(attachments != null){ + TraceContext.setTraceId(attachments.get(TraceContext.TRACE_KEY)); + } + EagleResponse response = (EagleResponse) invokeRouter.routeAndInvoke(request); TraceContext.clear(); response.setOpaque(request.getOpaque()); response.setNeedCompress(request.isNeedCompress()); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/AsyncCallbackTask.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/AsyncCallbackTask.java index c0abd7b..fb844b2 100644 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/AsyncCallbackTask.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/AsyncCallbackTask.java @@ -25,6 +25,8 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; +import java.util.Map; + /** * Created by fangyanpeng on 2017/8/22. */ @@ -38,8 +40,11 @@ public class AsyncCallbackTask implements Runnable{ @Override public void run() { - TraceContext.setOpaque(((NettyResponseFuture)responseFuture).getOpaque()); try { + Map attachments = ((NettyResponseFuture)responseFuture).getAttachments(); + if(attachments != null){ + TraceContext.setTraceId(attachments.get(TraceContext.TRACE_KEY)); + } responseFuture.executeCallback(); } catch (Throwable e) { logger.info("execute callback in executor exception, and callback throw", e); diff --git a/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/TimeoutMonitorTask.java b/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/TimeoutMonitorTask.java index cb67efd..14df36c 100644 --- a/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/TimeoutMonitorTask.java +++ b/eagle-transport-netty/src/main/java/eagle/jfaster/org/task/TimeoutMonitorTask.java @@ -40,15 +40,15 @@ public class TimeoutMonitorTask implements Runnable { @Override public void run() { - Iterator> it = client.getCallbackMap().entrySet().iterator(); + Iterator> it = client.getCallbackMap().entrySet().iterator(); while (it.hasNext()) { - Map.Entry next = it.next(); + Map.Entry next = it.next(); NettyResponseFuture rep = next.getValue(); if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 300) <= System.currentTimeMillis()) { if(rep.getCallBack() == null){ - rep.onFail(new EagleFrameException("%s request timeout, timeout: [%d] ms", client.getConfig().getInterfaceName(),rep.getTimeoutMillis())); + rep.onFail(new EagleFrameException("%s request timeout,requestid:%d,timeout:%d ms", client.getConfig().getInterfaceName(),rep.getOpaque(),rep.getTimeoutMillis())); }else { - rep.setException(new EagleFrameException("%s request timeout, timeout: [%d] ms", client.getConfig().getInterfaceName(),rep.getTimeoutMillis())); + rep.setException(new EagleFrameException("%s request timeout,requestid:%d,timeout:%d ms", client.getConfig().getInterfaceName(),rep.getOpaque(),rep.getTimeoutMillis())); client.executeInvokeCallback(rep); } it.remove(); diff --git a/eagle-ui/pom.xml b/eagle-ui/pom.xml index 0793f34..27c73d4 100755 --- a/eagle-ui/pom.xml +++ b/eagle-ui/pom.xml @@ -9,7 +9,7 @@ 4.0.0 org.jfaster.eagle eagle-ui - 1.5 + 1.6 jar eagle-ui http://maven.apache.org diff --git a/pom.xml b/pom.xml index 6c7d0a4..179b940 100755 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ org.jfaster.eagle eagle - 1.4 + 1.6 eagle-core eagle-registry-zookeeper diff --git a/spring-boot-starter-eagle/pom.xml b/spring-boot-starter-eagle/pom.xml index 72f7880..30f83ea 100644 --- a/spring-boot-starter-eagle/pom.xml +++ b/spring-boot-starter-eagle/pom.xml @@ -12,7 +12,7 @@ 4.0.0 org.jfaster.eagle spring-boot-starter-eagle - 1.5 + 1.6 jar spring-boot-starter-parent eagle for springBoot support