Skip to content

Commit

Permalink
Glue datasource support
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 8, 2023
1 parent 61d1eb7 commit 5e488f5
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
SPARK("spark");
SPARK("spark"),
S3GLUE("s3glue");

private String text;

DataSourceType(String text) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.sql.datasources.glue;

import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

@RequiredArgsConstructor
public class GlueDataSourceFactory implements DataSourceFactory {

private final Settings pluginSettings;

// Glue configuration properties
public static final String GLUE_AUTH_TYPE = "glue.auth.type";
public static final String GLUE_ROLE_ARN = "glue.auth.role_arn";
public static final String FLINT_URI = "glue.resultstore.opensearch.uri";
public static final String FLINT_AUTH = "glue.resultstore.opensearch.auth";
public static final String FLINT_REGION = "glue.resultstore.opensearch.region";

@Override
public DataSourceType getDataSourceType() {
return DataSourceType.S3GLUE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) {
try {
validateGlueDataSourceConfiguration(metadata.getProperties());
return new DataSource(
metadata.getName(),
metadata.getConnector(),
(dataSourceSchemaName, tableName) -> {
throw new UnsupportedOperationException("Glue storage engine is not supported.");
});
} catch (URISyntaxException | UnknownHostException e) {
throw new IllegalArgumentException("Invalid flint host in properties.");
}
}

private void validateGlueDataSourceConfiguration(Map<String, String> dataSourceMetadataConfig)
throws URISyntaxException, UnknownHostException {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH));
DatasourceValidationUtils.validateHost(
dataSourceMetadataConfig.get(FLINT_URI),
pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public static void validateLengthAndRequiredFields(
StringBuilder errorStringBuilder = new StringBuilder();
if (missingFields.size() > 0) {
errorStringBuilder.append(
String.format(
"Missing %s fields in the Prometheus connector properties.", missingFields));
String.format("Missing %s fields in the connector properties.", missingFields));
}

if (invalidLengthFields.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.sql.datasources.glue;

import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

@ExtendWith(MockitoExtension.class)
public class GlueDataSourceFactoryTest {

@Mock private Settings settings;

@Test
void testGetConnectorType() {
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);
Assertions.assertEquals(DataSourceType.S3GLUE, glueDatasourceFactory.getDataSourceType());
}

@Test
@SneakyThrows
void testCreateGLueDatSource() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.resultstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.resultstore.opensearch.auth", "false");
properties.put("glue.resultstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
DataSource dataSource = glueDatasourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType());
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
dataSource
.getStorageEngine()
.getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs"));
Assertions.assertEquals(
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHost() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(List.of("127.0.0.0/8"));
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.resultstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.resultstore.opensearch.auth", "false");
properties.put("glue.resultstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Disallowed hostname in the uri. "
+ "Validate with plugins.query.datasources.uri.hosts.denylist config",
illegalArgumentException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHostSyntax() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(List.of("127.0.0.0/8"));
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put(
"glue.resultstore.opensearch.uri",
"http://dummyprometheus.com:9090? paramt::localhost:9200");
properties.put("glue.resultstore.opensearch.auth", "false");
properties.put("glue.resultstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Invalid flint host in properties.", illegalArgumentException.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testValidateLengthAndRequiredFieldsWithAbsentField() {
DatasourceValidationUtils.validateLengthAndRequiredFields(
config, Set.of("s3.uri", "s3.auth.type")));
Assertions.assertEquals(
"Missing [s3.auth.type] fields in the Prometheus connector properties.",
"Missing [s3.auth.type] fields in the connector properties.",
illegalArgumentException.getMessage());
}

Expand All @@ -77,7 +77,7 @@ public void testValidateLengthAndRequiredFieldsWithInvalidLength() {
DatasourceValidationUtils.validateLengthAndRequiredFields(
config, Set.of("s3.uri", "s3.auth.type")));
Assertions.assertEquals(
"Missing [s3.auth.type] fields in the Prometheus connector properties.Fields "
"Missing [s3.auth.type] fields in the connector properties.Fields "
+ "[s3.uri] exceeds more than 1000 characters.",
illegalArgumentException.getMessage());
}
Expand Down
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
Expand Down Expand Up @@ -241,6 +242,7 @@ private DataSourceServiceImpl createDataSourceService() {
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDataSourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void testGetStorageEngineWithMissingURI() {
IllegalArgumentException.class,
() -> prometheusStorageFactory.getStorageEngine(properties));
Assertions.assertEquals(
"Missing [prometheus.uri] fields " + "in the Prometheus connector properties.",
"Missing [prometheus.uri] fields " + "in the connector properties.",
exception.getMessage());
}

Expand All @@ -99,7 +99,7 @@ void testGetStorageEngineWithMissingRegionInAWS() {
IllegalArgumentException.class,
() -> prometheusStorageFactory.getStorageEngine(properties));
Assertions.assertEquals(
"Missing [prometheus.auth.region] fields in the " + "Prometheus connector properties.",
"Missing [prometheus.auth.region] fields in the connector properties.",
exception.getMessage());
}

Expand All @@ -118,7 +118,7 @@ void testGetStorageEngineWithLongConfigProperties() {
() -> prometheusStorageFactory.getStorageEngine(properties));
Assertions.assertEquals(
"Missing [prometheus.auth.region] fields in the "
+ "Prometheus connector properties."
+ "connector properties."
+ "Fields [prometheus.uri] exceeds more than 1000 characters.",
exception.getMessage());
}
Expand Down

0 comments on commit 5e488f5

Please sign in to comment.