diff --git a/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java b/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java index b1945d120e..aa997b0be5 100644 --- a/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java +++ b/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java @@ -127,6 +127,12 @@ public EventFormatter supplyEventFormatter() return null; } + @Override + public void report( + Throwable ex) + { + } + @Override public void attachComposite(NamespaceConfig composite) { diff --git a/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/TlsConfiguration.java b/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/TlsConfiguration.java index e85f0e1a53..5bb4aa098d 100644 --- a/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/TlsConfiguration.java +++ b/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/TlsConfiguration.java @@ -15,6 +15,7 @@ */ package io.aklivity.zilla.runtime.binding.tls.internal; +import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_DEBUG; import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE; import io.aklivity.zilla.runtime.engine.Configuration; @@ -27,6 +28,7 @@ public class TlsConfiguration extends Configuration public static final LongPropertyDef TLS_AWAIT_SYNC_CLOSE_MILLIS; public static final BooleanPropertyDef TLS_PROACTIVE_CLIENT_REPLY_BEGIN; public static final BooleanPropertyDef TLS_VERBOSE; + public static final BooleanPropertyDef TLS_DEBUG; private static final ConfigurationDef TLS_CONFIG; @@ -39,6 +41,7 @@ public class TlsConfiguration extends Configuration TLS_AWAIT_SYNC_CLOSE_MILLIS = config.property("await.sync.close.millis", 3000L); TLS_PROACTIVE_CLIENT_REPLY_BEGIN = config.property("proactive.client.reply.begin", false); TLS_VERBOSE = config.property("verbose", TlsConfiguration::verboseDefault); + TLS_DEBUG = config.property("debug", TlsConfiguration::debugDefault); TLS_CONFIG = config; } @@ -78,9 +81,20 @@ public boolean verbose() return TLS_VERBOSE.getAsBoolean(this); } + public boolean debug() + { + return TLS_DEBUG.getAsBoolean(this); + } + private static boolean verboseDefault( Configuration config) { return ENGINE_VERBOSE.getAsBoolean(config); } + + private static boolean debugDefault( + Configuration config) + { + return ENGINE_DEBUG.getAsBoolean(config); + } } diff --git a/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/config/TlsBindingConfig.java b/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/config/TlsBindingConfig.java index b03c3d5cfb..ca02546e4d 100644 --- a/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/config/TlsBindingConfig.java +++ b/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/config/TlsBindingConfig.java @@ -106,7 +106,7 @@ public void init( if (keyManagers[i] instanceof X509ExtendedKeyManager) { X509ExtendedKeyManager keyManager = (X509ExtendedKeyManager) keyManagers[i]; - keyManagers[i] = new TlsClientX509ExtendedKeyManager(keyManager); + keyManagers[i] = new TlsClientX509ExtendedKeyManager(config, keyManager); } } } diff --git a/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/identity/TlsClientX509ExtendedKeyManager.java b/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/identity/TlsClientX509ExtendedKeyManager.java index 545a89939f..f81e42198d 100644 --- a/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/identity/TlsClientX509ExtendedKeyManager.java +++ b/runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/binding/tls/internal/identity/TlsClientX509ExtendedKeyManager.java @@ -19,8 +19,10 @@ import java.security.Principal; import java.security.PrivateKey; import java.security.cert.X509Certificate; +import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSession; @@ -28,6 +30,8 @@ import javax.net.ssl.X509KeyManager; import javax.security.auth.x500.X500Principal; +import io.aklivity.zilla.runtime.binding.tls.internal.TlsConfiguration; + public final class TlsClientX509ExtendedKeyManager extends X509ExtendedKeyManager implements X509KeyManager { public static final String COMMON_NAME_KEY = "common.name"; @@ -37,10 +41,13 @@ public final class TlsClientX509ExtendedKeyManager extends X509ExtendedKeyManage private final Matcher matchCN = COMMON_NAME_PATTERN.matcher(""); private final X509ExtendedKeyManager delegate; + private final boolean debug; public TlsClientX509ExtendedKeyManager( + TlsConfiguration config, X509ExtendedKeyManager delegate) { + this.debug = config.debug(); this.delegate = delegate; } @@ -119,6 +126,15 @@ else if (keyTypes != null) } } } + if (debug) + { + System.out.printf("[binding-tls] No match found for Subject CN [%s], Key Types [%s], Issuers [%s] \n", + subjectCN, + String.join(", ", keyTypes), + issuers != null + ? Arrays.stream(issuers).map(Principal::getName).collect(Collectors.joining(", ")) + : null); + } } return alias; diff --git a/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java b/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java index 9b351ac798..3409b6abc5 100644 --- a/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java +++ b/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java @@ -196,6 +196,12 @@ public EventFormatter supplyEventFormatter() return null; } + @Override + public void report( + Throwable ex) + { + } + @Override public void attachComposite( NamespaceConfig composite) diff --git a/runtime/command-start/src/main/java/io/aklivity/zilla/runtime/command/start/internal/airline/ZillaStartCommand.java b/runtime/command-start/src/main/java/io/aklivity/zilla/runtime/command/start/internal/airline/ZillaStartCommand.java index b1a7a32a65..79d462a608 100644 --- a/runtime/command-start/src/main/java/io/aklivity/zilla/runtime/command/start/internal/airline/ZillaStartCommand.java +++ b/runtime/command-start/src/main/java/io/aklivity/zilla/runtime/command/start/internal/airline/ZillaStartCommand.java @@ -18,6 +18,7 @@ import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_CONFIG_URL; import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_DIRECTORY; import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE; +import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE_EXCEPTIONS; import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKERS; import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ZILLA_DIRECTORY_PROPERTY; import static java.lang.Runtime.getRuntime; @@ -31,7 +32,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.function.Consumer; import org.agrona.ErrorHandler; @@ -124,6 +124,11 @@ public void run() props.setProperty(ENGINE_VERBOSE.name(), Boolean.toString(verbose)); } + if (exceptions) + { + props.setProperty(ENGINE_VERBOSE_EXCEPTIONS.name(), Boolean.toString(exceptions)); + } + EngineConfiguration config = new EngineConfiguration(props); Path configPath = Path.of(config.configURI()); @@ -142,13 +147,8 @@ public void run() } } - final Consumer report = exceptions - ? e -> e.printStackTrace(System.err) - : e -> System.err.println(e.getMessage()); - final ErrorHandler onError = ex -> { - report.accept(ex); stop.countDown(); }; diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineBuilder.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineBuilder.java index ad42fc0e61..106dc42387 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineBuilder.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineBuilder.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.LinkedHashSet; import java.util.Set; +import java.util.function.Consumer; import org.agrona.ErrorHandler; @@ -143,8 +144,15 @@ public Engine build() EventFormatterFactory eventFormatterFactory = EventFormatterFactory.instantiate(); final ErrorHandler errorHandler = requireNonNull(this.errorHandler, "errorHandler"); + final Consumer reporter = config.errorReporter(); + + final ErrorHandler onError = ex -> + { + reporter.accept(ex); + errorHandler.onError(ex); + }; return new Engine(config, bindings, exporters, guards, metricGroups, vaults, - catalogs, models, eventFormatterFactory, errorHandler, affinities, readonly); + catalogs, models, eventFormatterFactory, onError, affinities, readonly); } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java index 241e54b592..0745d4c9a5 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java @@ -32,6 +32,7 @@ import java.nio.file.Paths; import java.security.KeyStore; import java.util.Properties; +import java.util.function.Consumer; import java.util.function.Function; import org.agrona.LangUtil; @@ -72,7 +73,9 @@ public class EngineConfiguration extends Configuration public static final BooleanPropertyDef ENGINE_SYNTHETIC_ABORT; public static final LongPropertyDef ENGINE_ROUTED_DELAY_MILLIS; public static final LongPropertyDef ENGINE_CREDITOR_CHILD_CLEANUP_LINGER_MILLIS; + public static final BooleanPropertyDef ENGINE_DEBUG; public static final BooleanPropertyDef ENGINE_VERBOSE; + public static final BooleanPropertyDef ENGINE_VERBOSE_EXCEPTIONS; public static final BooleanPropertyDef ENGINE_VERBOSE_SCHEMA; public static final BooleanPropertyDef ENGINE_VERBOSE_SCHEMA_PLAIN; public static final BooleanPropertyDef ENGINE_VERBOSE_COMPOSITES; @@ -80,6 +83,7 @@ public class EngineConfiguration extends Configuration public static final PropertyDef ENGINE_CACERTS_STORE_TYPE; public static final PropertyDef ENGINE_CACERTS_STORE; public static final PropertyDef ENGINE_CACERTS_STORE_PASS; + public static final PropertyDef ENGINE_ERROR_REPORTER; private static final ConfigurationDef ENGINE_CONFIG; @@ -119,14 +123,18 @@ public class EngineConfiguration extends Configuration ENGINE_SYNTHETIC_ABORT = config.property("synthetic.abort", false); ENGINE_ROUTED_DELAY_MILLIS = config.property("routed.delay.millis", 0L); ENGINE_CREDITOR_CHILD_CLEANUP_LINGER_MILLIS = config.property("child.cleanup.linger", SECONDS.toMillis(5L)); + ENGINE_DEBUG = config.property("debug", false); ENGINE_VERBOSE = config.property("verbose", false); ENGINE_VERBOSE_COMPOSITES = config.property("verbose.composites", false); ENGINE_VERBOSE_SCHEMA = config.property("verbose.schema", false); ENGINE_VERBOSE_SCHEMA_PLAIN = config.property("verbose.schema.plain", false); + ENGINE_VERBOSE_EXCEPTIONS = config.property("exception-traces", false); ENGINE_WORKERS = config.property("workers", Runtime.getRuntime().availableProcessors()); ENGINE_CACERTS_STORE_TYPE = config.property("cacerts.store.type", EngineConfiguration::cacertsStoreTypeDefault); ENGINE_CACERTS_STORE = config.property("cacerts.store", EngineConfiguration::cacertsStoreDefault); ENGINE_CACERTS_STORE_PASS = config.property("cacerts.store.pass"); + ENGINE_ERROR_REPORTER = config.property(ErrorReporter.class, "error.reporter", + EngineConfiguration::decodeErrorReporter, EngineConfiguration::defaultErrorReporter); ENGINE_CONFIG = config; } @@ -281,6 +289,11 @@ public long childCleanupLingerMillis() return ENGINE_CREDITOR_CHILD_CLEANUP_LINGER_MILLIS.getAsLong(this); } + public boolean debug() + { + return ENGINE_DEBUG.getAsBoolean(this); + } + public boolean verbose() { return ENGINE_VERBOSE.getAsBoolean(this); @@ -321,6 +334,11 @@ public String cacertsStorePass() return ENGINE_CACERTS_STORE_PASS.get(this); } + public Consumer errorReporter() + { + return ENGINE_ERROR_REPORTER.get(this)::report; + } + public Function hostResolver() { return ENGINE_HOST_RESOLVER.get(this)::resolve; @@ -386,6 +404,13 @@ InetAddress[] resolve( String name); } + @FunctionalInterface + private interface ErrorReporter + { + void report( + Throwable ex); + } + private static String defaultName( Configuration config) { @@ -482,4 +507,47 @@ private static String cacertsStoreDefault( { return System.getProperty("javax.net.ssl.trustStore"); } + + private static ErrorReporter defaultErrorReporter( + Configuration config) + { + boolean exceptions = ENGINE_VERBOSE_EXCEPTIONS.get(config); + + return exceptions + ? e -> e.printStackTrace(System.err) + : e -> System.err.println(e.getMessage()); + } + + private static ErrorReporter decodeErrorReporter( + Configuration config, + String value) + { + ErrorReporter reporter = null; + + try + { + MethodType signature = MethodType.methodType(Void.class, Throwable.class); + String[] parts = value.split("::"); + Class ownerClass = Class.forName(parts[0]); + String methodName = parts[1]; + MethodHandle method = MethodHandles.publicLookup().findStatic(ownerClass, methodName, signature); + reporter = ex -> + { + try + { + method.invoke(ex); + } + catch (Throwable t) + { + LangUtil.rethrowUnchecked(t); + } + }; + } + catch (Throwable ex) + { + LangUtil.rethrowUnchecked(ex); + } + + return reporter; + } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java index 977a4a0064..517b0124d1 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java @@ -74,6 +74,9 @@ MessageConsumer supplyReceiver( EventFormatter supplyEventFormatter(); + void report( + Throwable ex); + void attachComposite( NamespaceConfig composite); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 586b71300f..f09a793cf2 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -219,6 +219,7 @@ public class EngineWorker implements EngineContext, Agent private final Path configPath; private final AgentRunner runner; private final IdleStrategy idleStrategy; + private final Consumer reporter; private final ErrorHandler errorHandler; private final ScalarsLayout countersLayout; private final ScalarsLayout gaugesLayout; @@ -451,6 +452,7 @@ public EngineWorker( this.taskQueue = new ConcurrentLinkedDeque<>(); this.correlations = new Long2ObjectHashMap<>(); this.idleStrategy = idleStrategy; + this.reporter = config.errorReporter(); this.errorHandler = errorHandler; this.exportersById = new Long2ObjectHashMap<>(); this.supplyEventReader = supplyEventReader; @@ -574,6 +576,13 @@ public long supplyTraceId() return traceId; } + @Override + public void report( + Throwable ex) + { + reporter.accept(ex); + } + @Override public void detachSender( long streamId)