From 448bea0f6e2cc41a1d269e4eca3a1ff1071bd570 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 28 Nov 2024 11:18:33 +0200 Subject: [PATCH] init --- .../hsfs/flink/HopsworksConnection.java | 16 +++++---- .../hsfs/flink/engine/FlinkEngine.java | 34 ------------------- .../hsfs/metadata/HopsworksClient.java | 32 +++++++++++++---- 3 files changed, 36 insertions(+), 46 deletions(-) diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java index 3d8d71d0ff..68195c77f8 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java @@ -20,7 +20,7 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HopsworksConnectionBase; import com.logicalclocks.hsfs.SecretStore; -import com.logicalclocks.hsfs.flink.engine.FlinkEngine; +import com.logicalclocks.hsfs.metadata.Credentials; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; @@ -30,6 +30,9 @@ import software.amazon.awssdk.regions.Region; import java.io.IOException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; public class HopsworksConnection extends HopsworksConnectionBase { @@ -37,7 +40,7 @@ public class HopsworksConnection extends HopsworksConnectionBase { public HopsworksConnection(String host, int port, String project, Region region, SecretStore secretStore, boolean hostnameVerification, String trustStorePath, String certPath, String apiKeyFilePath, String apiKeyValue) - throws IOException, FeatureStoreException { + throws IOException, FeatureStoreException, KeyStoreException, CertificateException, NoSuchAlgorithmException { this.host = host; this.port = port; this.project = getProjectName(project); @@ -54,10 +57,11 @@ public HopsworksConnection(String host, int port, String project, Region region, this.projectObj = getProject(); HopsworksClient.getInstance().setProject(this.projectObj); if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) { - HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); - hopsworksHttpClient.setTrustStorePath(FlinkEngine.getInstance().getTrustStorePath()); - hopsworksHttpClient.setKeyStorePath(FlinkEngine.getInstance().getKeyStorePath()); - hopsworksHttpClient.setCertKey(HopsworksHttpClient.readCertKey(FlinkEngine.getInstance().getCertKey())); + Credentials credentials = HopsworksClient.getInstance().getCredentials(); + HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); + hopsworksHttpClient.setTrustStorePath(credentials.gettStore()); + hopsworksHttpClient.setKeyStorePath(credentials.getkStore()); + hopsworksHttpClient.setCertKey(credentials.getPassword()); HopsworksClient.getInstance().setHopsworksHttpClient(hopsworksHttpClient); } } diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index 9e0645e967..aa6b075375 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -28,9 +28,6 @@ import lombok.Getter; import org.apache.avro.generic.GenericRecord; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.core.fs.Path; @@ -45,8 +42,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.flink.configuration.ConfigOptions.key; - public class FlinkEngine extends EngineBase { private static FlinkEngine INSTANCE = null; @@ -60,23 +55,6 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio @Getter private StreamExecutionEnvironment streamExecutionEnvironment; - private final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); - private final ConfigOption keyStorePath = - key("flink.hadoop.hops.ssl.keystore.name") - .stringType() - .defaultValue("trustStore.jks") - .withDescription("path to keyStore.jks"); - private final ConfigOption trustStorePath = - key("flink.hadoop.hops.ssl.truststore.name") - .stringType() - .defaultValue("trustStore.jks") - .withDescription("path to trustStore.jks"); - private final ConfigOption materialPasswdPath = - key("flink.hadoop.hops.ssl.keystores.passwd.name") - .stringType() - .defaultValue("material_passwd") - .withDescription("path to material_passwd"); - private FlinkEngine() throws FeatureStoreException { streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the streamExecutionEnvironment @@ -148,16 +126,4 @@ public Map getKafkaConfig(FeatureGroupBase featureGroup, Map