Skip to content

Commit

Permalink
enabling error reporting using supplyReporter (#1348)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitk-me authored Dec 26, 2024
1 parent b3f686c commit 72786eb
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public EventFormatter supplyEventFormatter()
return null;
}

@Override
public void report(
Throwable ex)
{
}

@Override
public void attachComposite(NamespaceConfig composite)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.binding.tls.internal;

import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_DEBUG;
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE;

import io.aklivity.zilla.runtime.engine.Configuration;
Expand All @@ -27,6 +28,7 @@ public class TlsConfiguration extends Configuration
public static final LongPropertyDef TLS_AWAIT_SYNC_CLOSE_MILLIS;
public static final BooleanPropertyDef TLS_PROACTIVE_CLIENT_REPLY_BEGIN;
public static final BooleanPropertyDef TLS_VERBOSE;
public static final BooleanPropertyDef TLS_DEBUG;

private static final ConfigurationDef TLS_CONFIG;

Expand All @@ -39,6 +41,7 @@ public class TlsConfiguration extends Configuration
TLS_AWAIT_SYNC_CLOSE_MILLIS = config.property("await.sync.close.millis", 3000L);
TLS_PROACTIVE_CLIENT_REPLY_BEGIN = config.property("proactive.client.reply.begin", false);
TLS_VERBOSE = config.property("verbose", TlsConfiguration::verboseDefault);
TLS_DEBUG = config.property("debug", TlsConfiguration::debugDefault);
TLS_CONFIG = config;
}

Expand Down Expand Up @@ -78,9 +81,20 @@ public boolean verbose()
return TLS_VERBOSE.getAsBoolean(this);
}

public boolean debug()
{
return TLS_DEBUG.getAsBoolean(this);
}

private static boolean verboseDefault(
Configuration config)
{
return ENGINE_VERBOSE.getAsBoolean(config);
}

private static boolean debugDefault(
Configuration config)
{
return ENGINE_DEBUG.getAsBoolean(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void init(
if (keyManagers[i] instanceof X509ExtendedKeyManager)
{
X509ExtendedKeyManager keyManager = (X509ExtendedKeyManager) keyManagers[i];
keyManagers[i] = new TlsClientX509ExtendedKeyManager(keyManager);
keyManagers[i] = new TlsClientX509ExtendedKeyManager(config, keyManager);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
import java.security.Principal;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509KeyManager;
import javax.security.auth.x500.X500Principal;

import io.aklivity.zilla.runtime.binding.tls.internal.TlsConfiguration;

public final class TlsClientX509ExtendedKeyManager extends X509ExtendedKeyManager implements X509KeyManager
{
public static final String COMMON_NAME_KEY = "common.name";
Expand All @@ -37,10 +41,13 @@ public final class TlsClientX509ExtendedKeyManager extends X509ExtendedKeyManage
private final Matcher matchCN = COMMON_NAME_PATTERN.matcher("");

private final X509ExtendedKeyManager delegate;
private final boolean debug;

public TlsClientX509ExtendedKeyManager(
TlsConfiguration config,
X509ExtendedKeyManager delegate)
{
this.debug = config.debug();
this.delegate = delegate;
}

Expand Down Expand Up @@ -119,6 +126,15 @@ else if (keyTypes != null)
}
}
}
if (debug)
{
System.out.printf("[binding-tls] No match found for Subject CN [%s], Key Types [%s], Issuers [%s] \n",
subjectCN,
String.join(", ", keyTypes),
issuers != null
? Arrays.stream(issuers).map(Principal::getName).collect(Collectors.joining(", "))
: null);
}
}

return alias;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ public EventFormatter supplyEventFormatter()
return null;
}

@Override
public void report(
Throwable ex)
{
}

@Override
public void attachComposite(
NamespaceConfig composite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_CONFIG_URL;
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_DIRECTORY;
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE;
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE_EXCEPTIONS;
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKERS;
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ZILLA_DIRECTORY_PROPERTY;
import static java.lang.Runtime.getRuntime;
Expand All @@ -31,7 +32,6 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.agrona.ErrorHandler;

Expand Down Expand Up @@ -124,6 +124,11 @@ public void run()
props.setProperty(ENGINE_VERBOSE.name(), Boolean.toString(verbose));
}

if (exceptions)
{
props.setProperty(ENGINE_VERBOSE_EXCEPTIONS.name(), Boolean.toString(exceptions));
}

EngineConfiguration config = new EngineConfiguration(props);

Path configPath = Path.of(config.configURI());
Expand All @@ -142,13 +147,8 @@ public void run()
}
}

final Consumer<Throwable> report = exceptions
? e -> e.printStackTrace(System.err)
: e -> System.err.println(e.getMessage());

final ErrorHandler onError = ex ->
{
report.accept(ex);
stop.countDown();
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.Consumer;

import org.agrona.ErrorHandler;

Expand Down Expand Up @@ -143,8 +144,15 @@ public Engine build()
EventFormatterFactory eventFormatterFactory = EventFormatterFactory.instantiate();

final ErrorHandler errorHandler = requireNonNull(this.errorHandler, "errorHandler");
final Consumer<Throwable> reporter = config.errorReporter();

final ErrorHandler onError = ex ->
{
reporter.accept(ex);
errorHandler.onError(ex);
};

return new Engine(config, bindings, exporters, guards, metricGroups, vaults,
catalogs, models, eventFormatterFactory, errorHandler, affinities, readonly);
catalogs, models, eventFormatterFactory, onError, affinities, readonly);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;

import org.agrona.LangUtil;
Expand Down Expand Up @@ -72,14 +73,17 @@ public class EngineConfiguration extends Configuration
public static final BooleanPropertyDef ENGINE_SYNTHETIC_ABORT;
public static final LongPropertyDef ENGINE_ROUTED_DELAY_MILLIS;
public static final LongPropertyDef ENGINE_CREDITOR_CHILD_CLEANUP_LINGER_MILLIS;
public static final BooleanPropertyDef ENGINE_DEBUG;
public static final BooleanPropertyDef ENGINE_VERBOSE;
public static final BooleanPropertyDef ENGINE_VERBOSE_EXCEPTIONS;
public static final BooleanPropertyDef ENGINE_VERBOSE_SCHEMA;
public static final BooleanPropertyDef ENGINE_VERBOSE_SCHEMA_PLAIN;
public static final BooleanPropertyDef ENGINE_VERBOSE_COMPOSITES;
public static final IntPropertyDef ENGINE_WORKERS;
public static final PropertyDef<String> ENGINE_CACERTS_STORE_TYPE;
public static final PropertyDef<String> ENGINE_CACERTS_STORE;
public static final PropertyDef<String> ENGINE_CACERTS_STORE_PASS;
public static final PropertyDef<ErrorReporter> ENGINE_ERROR_REPORTER;

private static final ConfigurationDef ENGINE_CONFIG;

Expand Down Expand Up @@ -119,14 +123,18 @@ public class EngineConfiguration extends Configuration
ENGINE_SYNTHETIC_ABORT = config.property("synthetic.abort", false);
ENGINE_ROUTED_DELAY_MILLIS = config.property("routed.delay.millis", 0L);
ENGINE_CREDITOR_CHILD_CLEANUP_LINGER_MILLIS = config.property("child.cleanup.linger", SECONDS.toMillis(5L));
ENGINE_DEBUG = config.property("debug", false);
ENGINE_VERBOSE = config.property("verbose", false);
ENGINE_VERBOSE_COMPOSITES = config.property("verbose.composites", false);
ENGINE_VERBOSE_SCHEMA = config.property("verbose.schema", false);
ENGINE_VERBOSE_SCHEMA_PLAIN = config.property("verbose.schema.plain", false);
ENGINE_VERBOSE_EXCEPTIONS = config.property("exception-traces", false);
ENGINE_WORKERS = config.property("workers", Runtime.getRuntime().availableProcessors());
ENGINE_CACERTS_STORE_TYPE = config.property("cacerts.store.type", EngineConfiguration::cacertsStoreTypeDefault);
ENGINE_CACERTS_STORE = config.property("cacerts.store", EngineConfiguration::cacertsStoreDefault);
ENGINE_CACERTS_STORE_PASS = config.property("cacerts.store.pass");
ENGINE_ERROR_REPORTER = config.property(ErrorReporter.class, "error.reporter",
EngineConfiguration::decodeErrorReporter, EngineConfiguration::defaultErrorReporter);
ENGINE_CONFIG = config;
}

Expand Down Expand Up @@ -281,6 +289,11 @@ public long childCleanupLingerMillis()
return ENGINE_CREDITOR_CHILD_CLEANUP_LINGER_MILLIS.getAsLong(this);
}

public boolean debug()
{
return ENGINE_DEBUG.getAsBoolean(this);
}

public boolean verbose()
{
return ENGINE_VERBOSE.getAsBoolean(this);
Expand Down Expand Up @@ -321,6 +334,11 @@ public String cacertsStorePass()
return ENGINE_CACERTS_STORE_PASS.get(this);
}

public Consumer<Throwable> errorReporter()
{
return ENGINE_ERROR_REPORTER.get(this)::report;
}

public Function<String, InetAddress[]> hostResolver()
{
return ENGINE_HOST_RESOLVER.get(this)::resolve;
Expand Down Expand Up @@ -386,6 +404,13 @@ InetAddress[] resolve(
String name);
}

@FunctionalInterface
private interface ErrorReporter
{
void report(
Throwable ex);
}

private static String defaultName(
Configuration config)
{
Expand Down Expand Up @@ -482,4 +507,47 @@ private static String cacertsStoreDefault(
{
return System.getProperty("javax.net.ssl.trustStore");
}

private static ErrorReporter defaultErrorReporter(
Configuration config)
{
boolean exceptions = ENGINE_VERBOSE_EXCEPTIONS.get(config);

return exceptions
? e -> e.printStackTrace(System.err)
: e -> System.err.println(e.getMessage());
}

private static ErrorReporter decodeErrorReporter(
Configuration config,
String value)
{
ErrorReporter reporter = null;

try
{
MethodType signature = MethodType.methodType(Void.class, Throwable.class);
String[] parts = value.split("::");
Class<?> ownerClass = Class.forName(parts[0]);
String methodName = parts[1];
MethodHandle method = MethodHandles.publicLookup().findStatic(ownerClass, methodName, signature);
reporter = ex ->
{
try
{
method.invoke(ex);
}
catch (Throwable t)
{
LangUtil.rethrowUnchecked(t);
}
};
}
catch (Throwable ex)
{
LangUtil.rethrowUnchecked(ex);
}

return reporter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ MessageConsumer supplyReceiver(

EventFormatter supplyEventFormatter();

void report(
Throwable ex);

void attachComposite(
NamespaceConfig composite);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public class EngineWorker implements EngineContext, Agent
private final Path configPath;
private final AgentRunner runner;
private final IdleStrategy idleStrategy;
private final Consumer<Throwable> reporter;
private final ErrorHandler errorHandler;
private final ScalarsLayout countersLayout;
private final ScalarsLayout gaugesLayout;
Expand Down Expand Up @@ -451,6 +452,7 @@ public EngineWorker(
this.taskQueue = new ConcurrentLinkedDeque<>();
this.correlations = new Long2ObjectHashMap<>();
this.idleStrategy = idleStrategy;
this.reporter = config.errorReporter();
this.errorHandler = errorHandler;
this.exportersById = new Long2ObjectHashMap<>();
this.supplyEventReader = supplyEventReader;
Expand Down Expand Up @@ -574,6 +576,13 @@ public long supplyTraceId()
return traceId;
}

@Override
public void report(
Throwable ex)
{
reporter.accept(ex);
}

@Override
public void detachSender(
long streamId)
Expand Down

0 comments on commit 72786eb

Please sign in to comment.