Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1619] Update Flink certificate management #1405

Merged
merged 1 commit into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.flink.engine.FlinkEngine;
import com.logicalclocks.hsfs.metadata.Credentials;
import com.logicalclocks.hsfs.metadata.HopsworksClient;

import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
Expand All @@ -30,14 +30,17 @@
import software.amazon.awssdk.regions.Region;

import java.io.IOException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;

public class HopsworksConnection extends HopsworksConnectionBase {

@Builder
public HopsworksConnection(String host, int port, String project, Region region, SecretStore secretStore,
boolean hostnameVerification, String trustStorePath,
String certPath, String apiKeyFilePath, String apiKeyValue)
throws IOException, FeatureStoreException {
throws IOException, FeatureStoreException, KeyStoreException, CertificateException, NoSuchAlgorithmException {
this.host = host;
this.port = port;
this.project = getProjectName(project);
Expand All @@ -54,10 +57,11 @@ public HopsworksConnection(String host, int port, String project, Region region,
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) {
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
hopsworksHttpClient.setTrustStorePath(FlinkEngine.getInstance().getTrustStorePath());
hopsworksHttpClient.setKeyStorePath(FlinkEngine.getInstance().getKeyStorePath());
hopsworksHttpClient.setCertKey(HopsworksHttpClient.readCertKey(FlinkEngine.getInstance().getCertKey()));
Credentials credentials = HopsworksClient.getInstance().getCredentials();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
hopsworksHttpClient.setTrustStorePath(credentials.gettStore());
hopsworksHttpClient.setKeyStorePath(credentials.getkStore());
hopsworksHttpClient.setCertKey(credentials.getPassword());
HopsworksClient.getInstance().setHopsworksHttpClient(hopsworksHttpClient);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import lombok.Getter;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.core.fs.Path;
Expand All @@ -45,8 +42,6 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.configuration.ConfigOptions.key;

public class FlinkEngine extends EngineBase {
private static FlinkEngine INSTANCE = null;

Expand All @@ -60,23 +55,6 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio
@Getter
private StreamExecutionEnvironment streamExecutionEnvironment;

private final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
private final ConfigOption<String> keyStorePath =
key("flink.hadoop.hops.ssl.keystore.name")
.stringType()
.defaultValue("trustStore.jks")
.withDescription("path to keyStore.jks");
private final ConfigOption<String> trustStorePath =
key("flink.hadoop.hops.ssl.truststore.name")
.stringType()
.defaultValue("trustStore.jks")
.withDescription("path to trustStore.jks");
private final ConfigOption<String> materialPasswdPath =
key("flink.hadoop.hops.ssl.keystores.passwd.name")
.stringType()
.defaultValue("material_passwd")
.withDescription("path to material_passwd");

private FlinkEngine() throws FeatureStoreException {
streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the streamExecutionEnvironment
Expand Down Expand Up @@ -148,16 +126,4 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str
config.put("enable.idempotence", "false");
return config;
}

public String getTrustStorePath() {
return flinkConfig.getString(trustStorePath);
}

public String getKeyStorePath() {
return flinkConfig.getString(keyStorePath);
}

public String getCertKey() {
return flinkConfig.getString(materialPasswdPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import software.amazon.awssdk.regions.Region;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -118,21 +120,39 @@ public Credentials getCredentials() throws FeatureStoreException, IOException,
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new ByteArrayInputStream(Base64.getDecoder().decode(credentials.getkStore())),
credentials.getPassword().toCharArray());
String keyStorePath = System.getProperty("java.io.tmpdir") + "/keyStore.jks";
ks.store(new FileOutputStream(keyStorePath), credentials.getPassword().toCharArray());
File keyStore = createTempFile("keyStore.jks");
ks.store(new FileOutputStream(keyStore), credentials.getPassword().toCharArray());

KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new ByteArrayInputStream(Base64.getDecoder().decode(credentials.gettStore())),
credentials.getPassword().toCharArray());
String trustStorePath = System.getProperty("java.io.tmpdir") + "/trustStore.jks";
ts.store(new FileOutputStream(trustStorePath), credentials.getPassword().toCharArray());
File trustStore = createTempFile("trustStore.jks");
ts.store(new FileOutputStream(trustStore), credentials.getPassword().toCharArray());

credentials.setkStore(keyStorePath);
credentials.settStore(trustStorePath);
credentials.setkStore(keyStore.getAbsolutePath());
credentials.settStore(trustStore.getAbsolutePath());

return credentials;
}

private File createTempFile(String fileName) throws FeatureStoreException {
HopsworksClient hopsworksClient = getInstance();

// Create a File object
File file = Paths.get(
System.getProperty("java.io.tmpdir"),
hopsworksClient.getProject().getProjectName(),
fileName).toFile();

// Ensure parent directories exist
File parentDir = file.getParentFile();
if (parentDir != null && !parentDir.exists()) {
parentDir.mkdirs();
}

return file;
}

@Getter
@Setter
protected HopsworksHttpClient hopsworksHttpClient;
Expand Down
Loading