Skip to content

Commit

Permalink
Add Tenant aware Rest Tests for Workflows
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Dec 27, 2024
1 parent c5a01c5 commit 476256c
Show file tree
Hide file tree
Showing 11 changed files with 635 additions and 54 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,22 @@ jobs:
if: github.repository == 'opensearch-project/flow-framework'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: 21
java-version: ${{ matrix.java }}
distribution: temurin
# TEMPORARY until this is on Maven
- name: Checkout Metadata Client
uses: actions/checkout@v4
with:
repository: dbwiddis/opensearch-remote-metadata-sdk
ref: main
path: opensearch-remote-metadata-sdk
- name: Publish to maven local
run: ./gradlew publishToMavenLocal
# end TEMPORARY code
- uses: actions/checkout@v4
- name: Javadoc CheckStyle
run: ./gradlew checkstyleMain
- name: Javadoc Check
Expand Down Expand Up @@ -98,3 +109,4 @@ jobs:
- name: Build and Run Tests
run: |
./gradlew integTest -PnumNodes=3
./gradlew integTest -PnumNodes=3 -Dtests.rest.tenantaware=true
33 changes: 32 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,19 @@ integTest {
systemProperty('user', user)
systemProperty('password', password)

// Only tenant aware test if set
if (System.getProperty("tests.rest.tenantaware") == "true") {
filter {
includeTestsMatching "org.opensearch.flowframework.*TenantAwareIT"
}
systemProperty "plugins.flow_framework.multi_tenancy_enabled", "true"
}

// Only rest case can run with remote cluster
if (System.getProperty("tests.rest.cluster") != null) {
if (System.getProperty("tests.rest.cluster") != null && System.getProperty("tests.rest.tenantaware") == null) {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
excludeTestsMatching "org.opensearch.ml.rest.*TenantAwareIT"
}
}

Expand Down Expand Up @@ -319,6 +328,28 @@ integTest {

// doFirst delays this block until execution time
doFirst {
if (System.getProperty("tests.rest.tenantaware") == "true") {
def ymlFile = file("$buildDir/testclusters/integTest-0/config/opensearch.yml")
if (ymlFile.exists()) {
ymlFile.withWriterAppend {
writer ->
writer.write("\n# Set multitenancy\n")
writer.write("plugins.flow_framework.multi_tenancy_enabled: true\n")
}
// TODO this properly uses the remote client factory but needs a remote cluster set up
// TODO get the endpoint from a system property
if (System.getProperty("tests.rest.cluster") != null) {
ymlFile.withWriterAppend { writer ->
writer.write("\n# Use a remote cluster\n")
writer.write("plugins.flow_framework.remote_metadata_type: RemoteOpenSearch\n")
writer.write("plugins.flow_framework.remote_metadata_endpoint: https://127.0.0.1:9200\n")
}
}
} else {
throw new GradleException("opensearch.yml not found at: $ymlFile")
}
}

// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
// use longer timeouts for requests.
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ private Builder(Template t) {
this.createdTime = t.createdTime();
this.lastUpdatedTime = t.lastUpdatedTime();
this.lastProvisionedTime = t.lastProvisionedTime();
this.tenantId = t.getTenantId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,52 +102,56 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
.stream()
.filter(e -> !request.consumedParams().contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
String tenantId = RestActionUtils.getTenantID(flowFrameworkSettings.isMultiTenancyEnabled(), request);
if (!provision && !params.isEmpty()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (provision && updateFields) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && workflowId == null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && useCase != null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && !params.isEmpty()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
try {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
String tenantId = RestActionUtils.getTenantID(flowFrameworkSettings.isMultiTenancyEnabled(), request);
if (!provision && !params.isEmpty()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (provision && updateFields) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the "
+ PROVISION_WORKFLOW
+ " and "
+ UPDATE_WORKFLOW_FIELDS
+ " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && workflowId == null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && useCase != null) {
FlowFrameworkException ffe = new FlowFrameworkException(
"You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && !params.isEmpty()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
if (useCase != null) {
Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/mappings/global-context.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 3
"schema_version": 4
},
"properties": {
"workflow_id": {
Expand Down Expand Up @@ -86,6 +86,9 @@
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"tenant_id": {
"type": "keyword"
},
"ui_metadata": {
"type": "object",
"enabled": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.message.BasicHeader;
Expand Down Expand Up @@ -146,6 +147,19 @@ protected String getProtocol() {
return isHttps() ? "https" : "http";
}

public static Map<String, Object> responseToMap(Response response) throws IOException {
HttpEntity entity = response.getEntity();
assertNotNull(response);
String entityString = TestHelpers.httpEntityToString(entity);
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
entityString
);
parser.nextToken();
return parser.map();
}

// Utility fn for deleting indices. Should only be used when not allowed in a regular context
// (e.g., deleting system indices)
protected static void deleteIndexWithAdminClient(String name) throws IOException {
Expand Down
Loading

0 comments on commit 476256c

Please sign in to comment.