Skip to content

Commit

Permalink
Glue datasource support
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 7, 2023
1 parent 24e01d6 commit d3e0b27
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
SPARK("spark");
SPARK("spark"),
GLUE("glue");

private String text;

DataSourceType(String text) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.storage;

import java.net.URISyntaxException;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -23,5 +24,5 @@ public interface DataSourceFactory {
DataSourceType getDataSourceType();

/** Create {@link DataSource}. */
DataSource createDataSource(DataSourceMetadata metadata);
DataSource createDataSource(DataSourceMetadata metadata) throws URISyntaxException;
}
1 change: 1 addition & 0 deletions datasources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}"
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'
implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.0'
implementation group: 'commons-validator', name: 'commons-validator', version: '1.7'

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.opensearch.sql.datasources.glue;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.RequiredArgsConstructor;
import org.apache.commons.validator.routines.DomainValidator;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;


@RequiredArgsConstructor
public class GlueDatasourceFactory implements DataSourceFactory {

private final Settings pluginSettings;

// Glue configuration properties
public static final String GLUE_AUTH_TYPE = "glue.auth.type";
public static final String GLUE_ACCOUNT_ID = "glue.auth.account_id";
public static final String GLUE_ROLE_ARN = "glue.auth.role_arn";
public static final String GLUE_REGION = "glue.auth.region";

// Flint integration jar configuration properties
public static final String FLINT_INTEGRATION = "spark.datasource.flint.integration";
public static final String FLINT_HOST = "spark.datasource.flint.host";
public static final String FLINT_PORT = "spark.datasource.flint.port";
public static final String FLINT_SCHEME = "spark.datasource.flint.scheme";
public static final String FLINT_AUTH = "spark.datasource.flint.auth";
public static final String FLINT_REGION = "spark.datasource.flint.region";

private static final Integer MAX_LENGTH_FOR_CONFIG_PROPERTY = 1000;
@Override
public DataSourceType getDataSourceType() {
return DataSourceType.GLUE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) throws URISyntaxException {
validateGlueDataSourceConfiguration(metadata.getProperties());
return new DataSource(metadata.getName(), metadata.getConnector(),
(dataSourceSchemaName, tableName) -> {
throw new UnsupportedOperationException("Glue storage engine is not supported.");
});
}

private void validateGlueDataSourceConfiguration(Map<String, String> dataSourceMetadataConfig)
throws URISyntaxException {
validateMissingFields(dataSourceMetadataConfig,
Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, GLUE_REGION, GLUE_ACCOUNT_ID,
FLINT_INTEGRATION, FLINT_HOST, FLINT_PORT, FLINT_REGION, FLINT_SCHEME, FLINT_AUTH));
validateFlintHost(dataSourceMetadataConfig);
}


private void validateMissingFields(Map<String, String> config, Set<String> fields) {
Set<String> missingFields = new HashSet<>();
Set<String> invalidLengthFields = new HashSet<>();
for (String field : fields) {
if (!config.containsKey(field)) {
missingFields.add(field);
} else if (config.get(field).length() > MAX_LENGTH_FOR_CONFIG_PROPERTY) {
invalidLengthFields.add(field);
}
}
StringBuilder errorStringBuilder = new StringBuilder();
if (missingFields.size() > 0) {
errorStringBuilder.append(
String.format(
"Missing %s fields in the Prometheus connector properties.", missingFields));
}

if (invalidLengthFields.size() > 0) {
errorStringBuilder.append(
String.format("Fields %s exceeds more than 1000 characters.", invalidLengthFields));
}
if (errorStringBuilder.length() > 0) {
throw new IllegalArgumentException(errorStringBuilder.toString());
}
}

private void validateFlintHost(Map<String, String> config) throws URISyntaxException {
URI uri = new URI(config.get(FLINT_HOST));
String host = uri.getHost();
if (host == null
|| (!(DomainValidator.getInstance().isValid(host)
|| DomainValidator.getInstance().isValidLocalTld(host)))) {
throw new IllegalArgumentException(
String.format("Invalid hostname in the uri: %s", config.get(FLINT_HOST)));
} else {
Pattern allowHostsPattern =
Pattern.compile(pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_ALLOWHOSTS));
Matcher matcher = allowHostsPattern.matcher(host);
if (!matcher.matches()) {
throw new IllegalArgumentException(
String.format(
"Disallowed hostname in the uri. Validate with %s config",
Settings.Key.DATASOURCES_URI_ALLOWHOSTS.getKeyValue()));
}
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.sql.datasources.utils;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.experimental.UtilityClass;
import org.apache.commons.validator.routines.DomainValidator;

@UtilityClass
public class DataSourceValidationUtils {

public static void validateHost() {

}

public static void validateMissingFields(String uriString) throws URISyntaxException {
URI uri = new URI(uriString);
String host = uri.getHost();
if (host == null
|| (!(DomainValidator.getInstance().isValid(host)
|| DomainValidator.getInstance().isValidLocalTld(host)))) {
throw new IllegalArgumentException(
String.format("Invalid hostname in the uri: %s", config.get(FLINT_HOST)));
} else {
Pattern allowHostsPattern =
Pattern.compile(pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_ALLOWHOSTS));
Matcher matcher = allowHostsPattern.matcher(host);
if (!matcher.matches()) {
throw new IllegalArgumentException(
String.format(
"Disallowed hostname in the uri. Validate with %s config",
Settings.Key.DATASOURCES_URI_ALLOWHOSTS.getKeyValue()));
}
}
}

}
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDatasourceFactory;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
Expand Down Expand Up @@ -241,6 +242,7 @@ private DataSourceServiceImpl createDataSourceService() {
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDatasourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down

0 comments on commit d3e0b27

Please sign in to comment.