From 9cf8d51fe0dac500a23b9537aaa7a5f233cfddf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Mon, 4 Mar 2024 14:18:00 +0100 Subject: [PATCH] Add PipelineV2ServerST test --- .../io/odh/test/platform/KFPv2Client.java | 272 ++++++++++++ .../test/e2e/standard/PipelineV2ServerST.java | 401 ++++++++++++++++++ src/test/resources/pipelines/better_v2.yaml | 227 ++++++++++ 3 files changed, 900 insertions(+) create mode 100644 src/main/java/io/odh/test/platform/KFPv2Client.java create mode 100644 src/test/java/io/odh/test/e2e/standard/PipelineV2ServerST.java create mode 100644 src/test/resources/pipelines/better_v2.yaml diff --git a/src/main/java/io/odh/test/platform/KFPv2Client.java b/src/main/java/io/odh/test/platform/KFPv2Client.java new file mode 100644 index 00000000..13da2acc --- /dev/null +++ b/src/main/java/io/odh/test/platform/KFPv2Client.java @@ -0,0 +1,272 @@ +/* + * Copyright Skodjob authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.odh.test.platform; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import io.odh.test.TestUtils; +import io.odh.test.platform.httpClient.MultipartFormDataBodyPublisher; +import lombok.SneakyThrows; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static io.odh.test.TestUtils.DEFAULT_TIMEOUT_DURATION; +import static io.odh.test.TestUtils.DEFAULT_TIMEOUT_UNIT; +import static org.hamcrest.MatcherAssert.assertThat; + +// https://www.kubeflow.org/docs/components/pipelines/v2/reference/api/kubeflow-pipeline-api-spec/ +public class KFPv2Client { + private final ObjectMapper objectMapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + private final HttpClient httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .followRedirects(HttpClient.Redirect.NORMAL) + .build(); + + private final String baseUrl; + + public KFPv2Client(String baseUrl) { + this.baseUrl = baseUrl; + } + + @SneakyThrows + public Pipeline importPipeline(String name, String description, String filePath) { + MultipartFormDataBodyPublisher requestBody = new MultipartFormDataBodyPublisher() + .addFile("uploadfile", Path.of(filePath), "application/yaml"); + + HttpRequest createPipelineRequest = HttpRequest.newBuilder() + .uri(new URI(baseUrl + "/apis/v2beta1/pipelines/upload?name=%s&description=%s".formatted(name, description))) + .header("Content-Type", requestBody.contentType()) + .POST(requestBody) + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + HttpResponse responseCreate = httpClient.send(createPipelineRequest, HttpResponse.BodyHandlers.ofString()); + + assertThat(responseCreate.body(), responseCreate.statusCode(), Matchers.is(200)); + + return objectMapper.readValue(responseCreate.body(), Pipeline.class); + } + + @SneakyThrows + public @Nonnull List listPipelines() { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/pipelines")) + .GET() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + Assertions.assertEquals(reply.statusCode(), 200, reply.body()); + + ListPipelinesResponse json = objectMapper.readValue(reply.body(), ListPipelinesResponse.class); + List pipelines = json.pipelines; + + return pipelines == null ? Collections.emptyList() : pipelines; + } + + @SneakyThrows + public @Nonnull List listPipelineVersions(String pipelineId) { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/pipelines/" + pipelineId + "/versions")) + .GET() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + Assertions.assertEquals(reply.statusCode(), 200, reply.body()); + + ListPipelineVersionsResponse json = objectMapper.readValue(reply.body(), ListPipelineVersionsResponse.class); + List pipelineVersions = json.pipelineVersions; + + return pipelineVersions == null ? Collections.emptyList() : pipelineVersions; + } + + @SneakyThrows + public PipelineRun runPipeline(String pipelineTestRunBasename, String pipelineId, Map parameters, String immediate) { + Assertions.assertEquals(immediate, "Immediate"); + + PipelineRun pipelineRun = new PipelineRun(); + pipelineRun.displayName = pipelineTestRunBasename; + pipelineRun.pipelineVersionReference = new PipelineVersionReference(); + pipelineRun.pipelineVersionReference.pipeline_id = pipelineId; + if (parameters != null) { + pipelineRun.runtimeConfig = new RuntimeConfig(); + pipelineRun.runtimeConfig.parameters = parameters; + } + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/runs")) + .POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(pipelineRun))) + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + + Assertions.assertEquals(reply.statusCode(), 200, reply.body()); + return objectMapper.readValue(reply.body(), PipelineRun.class); + } + + @SneakyThrows + public List getPipelineRunStatus() { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/runs")) + .GET() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + + Assertions.assertEquals(reply.statusCode(), 200, reply.body()); + return objectMapper.readValue(reply.body(), ApiListRunsResponse.class).runs; + } + + @SneakyThrows + public PipelineRun waitForPipelineRun(String pipelineRunId) { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/runs/" + pipelineRunId)) + .GET() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + + AtomicReference run = new AtomicReference<>(); + TestUtils.waitFor("pipelineRun to complete", 5000, 10 * 60 * 1000, () -> { + HttpResponse reply = null; + try { + reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + Assertions.assertEquals(reply.statusCode(), 200, reply.body()); + run.set(objectMapper.readValue(reply.body(), PipelineRun.class)); + String state = run.get().state; + if (state == null) { + return false; // e.g. pod has not been deployed + } + // https://github.com/kubeflow/pipelines/issues/7705 + return switch (state) { + case "SUCCEEDED" -> true; + case "PENDING", "RUNNING" -> false; + case "SKIPPED", "FAILED", "CANCELING", "CANCELED", "PAUSED" -> + throw new AssertionError("Pipeline run failed: " + state + run.get().error); + default -> throw new AssertionError("Unexpected pipeline run status: " + state + run.get().error); + }; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + return run.get(); + } + + @SneakyThrows + public void deletePipelineRun(String runId) { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/runs/" + runId)) + .DELETE() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + Assertions.assertEquals(200, reply.statusCode(), reply.body()); + } + + @SneakyThrows + public void deletePipeline(String pipelineId) { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/pipelines/" + pipelineId)) + .DELETE() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + Assertions.assertEquals(200, reply.statusCode(), reply.body()); + } + + @SneakyThrows + public void deletePipelineVersion(String pipelineId, String pipelineVersionId) { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/apis/v2beta1/pipelines/" + pipelineId + "/versions/" + pipelineVersionId)) + .DELETE() + .timeout(Duration.of(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT.toChronoUnit())) + .build(); + HttpResponse reply = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + Assertions.assertEquals(200, reply.statusCode(), reply.body()); + } + + /// helpers for reading json responses + /// there is openapi spec, so this can be generated + + public static class ListPipelinesResponse { + public List pipelines; + public int totalSize; + public String nextPageToken; + } + + public static class Pipeline { + public String pipelineId; + public String displayName; + } + + public static class ListPipelineVersionsResponse { + public List pipelineVersions; + public int totalSize; + public String nextPageToken; + } + + public static class PipelineVersion { + public String pipelineVersionId; + public String displayName; + } + + public static class ApiListRunsResponse { + public List runs; + public int totalSize; + public String nextPageToken; + } + + public static class PipelineRun { + public String runId; + public String displayName; + public String pipelineVersionId; + public PipelineVersionReference pipelineVersionReference; + public RuntimeConfig runtimeConfig; + + public String createdAt; + public String scheduledAt; + public String finishedAt; + public RunDetails runDetails; + + public String state; // "PENDING", ... + public String error; + } + + public static class RunDetails { + public String pipelineContextId; + public String pipelineRunContextId; + public Object taskDetails; + } + + public static class PipelineVersionReference { + public String pipelineId; + public String pipelineVersionId; + } + + public static class RuntimeConfig { + public Object parameters; + public String pipelineRoot; + } +} diff --git a/src/test/java/io/odh/test/e2e/standard/PipelineV2ServerST.java b/src/test/java/io/odh/test/e2e/standard/PipelineV2ServerST.java new file mode 100644 index 00000000..11f1bd25 --- /dev/null +++ b/src/test/java/io/odh/test/e2e/standard/PipelineV2ServerST.java @@ -0,0 +1,401 @@ +/* + * Copyright Skodjob authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.odh.test.e2e.standard; + +import io.fabric8.kubernetes.api.model.ContainerStateTerminated; +import io.fabric8.kubernetes.api.model.ContainerStatus; +import io.fabric8.kubernetes.api.model.EndpointSubset; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.api.model.NamespaceBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.LocalPortForward; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.dsl.ServiceResource; +import io.fabric8.openshift.api.model.Route; +import io.fabric8.openshift.client.OpenShiftClient; +import io.odh.test.Environment; +import io.odh.test.OdhAnnotationsLabels; +import io.odh.test.TestConstants; +import io.odh.test.TestUtils; +import io.odh.test.framework.listeners.ResourceManagerDeleteHandler; +import io.odh.test.framework.manager.ResourceManager; +import io.odh.test.platform.KFPv2Client; +import io.odh.test.utils.DscUtils; +import io.opendatahub.datasciencecluster.v1.DataScienceCluster; +import io.opendatahub.datasciencecluster.v1.DataScienceClusterBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.ComponentsBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Codeflare; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.CodeflareBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Dashboard; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.DashboardBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Datasciencepipelines; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.DatasciencepipelinesBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Kserve; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.KserveBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Kueue; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.KueueBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Modelmeshserving; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.ModelmeshservingBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Ray; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.RayBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.Workbenches; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.WorkbenchesBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.datasciencepipelines.DevFlagsBuilder; +import io.opendatahub.datasciencecluster.v1.datascienceclusterspec.components.datasciencepipelines.devflags.ManifestsBuilder; +import io.opendatahub.datasciencepipelinesapplications.v1alpha1.DataSciencePipelinesApplication; +import io.opendatahub.datasciencepipelinesapplications.v1alpha1.DataSciencePipelinesApplicationBuilder; +import io.opendatahub.datasciencepipelinesapplications.v1alpha1.datasciencepipelinesapplicationspec.ApiServer; +import io.opendatahub.dscinitialization.v1.DSCInitialization; +import io.skodjob.annotations.Contact; +import io.skodjob.annotations.Desc; +import io.skodjob.annotations.Step; +import io.skodjob.annotations.SuiteDoc; +import io.skodjob.annotations.TestDoc; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; + +@SuiteDoc( + description = @Desc("Verifies simple setup of ODH by spin-up operator, setup DSCI, and setup DSC."), + beforeTestSteps = { + @Step(value = "Deploy Pipelines Operator", expected = "Pipelines operator is available on the cluster"), + @Step(value = "Deploy ServiceMesh Operator", expected = "ServiceMesh operator is available on the cluster"), + @Step(value = "Deploy Serverless Operator", expected = "Serverless operator is available on the cluster"), + @Step(value = "Install ODH operator", expected = "Operator is up and running and is able to serve it's operands"), + @Step(value = "Deploy DSCI", expected = "DSCI is created and ready"), + @Step(value = "Deploy DSC", expected = "DSC is created and ready") + }, + afterTestSteps = { + @Step(value = "Delete ODH operator and all created resources", expected = "Operator is removed and all other resources as well") + } +) +@ExtendWith(ResourceManagerDeleteHandler.class) +public class PipelineV2ServerST extends StandardAbstract { + private static final Logger LOGGER = LoggerFactory.getLogger(PipelineV2ServerST.class); + + private static final String DS_PROJECT_NAME = "test-pipelines"; + + private final ResourceManager resourceManager = ResourceManager.getInstance(); + private final KubernetesClient client = ResourceManager.getKubeClient().getClient(); + + @BeforeAll + void deployDataScienceCluster() { + if (Environment.SKIP_DEPLOY_DSCI_DSC) { + LOGGER.info("DSCI and DSC deploy is skipped"); + return; + } + + // Create DSCI + DSCInitialization dsci = DscUtils.getBasicDSCI(); + // Create DSC + DataScienceCluster dsc = new DataScienceClusterBuilder() + .withNewMetadata() + .withName(DS_PROJECT_NAME) + .endMetadata() + .withNewSpec() + .withComponents( + new ComponentsBuilder() + .withWorkbenches( + new WorkbenchesBuilder().withManagementState(Workbenches.ManagementState.MANAGED).build() + ) + .withDashboard( + new DashboardBuilder().withManagementState(Dashboard.ManagementState.MANAGED).build() + ) + .withKserve( + new KserveBuilder().withManagementState(Kserve.ManagementState.MANAGED).build() + ) + .withKueue( + new KueueBuilder().withManagementState(Kueue.ManagementState.MANAGED).build() + ) + .withCodeflare( + new CodeflareBuilder().withManagementState(Codeflare.ManagementState.MANAGED).build() + ) + .withDatasciencepipelines( + new DatasciencepipelinesBuilder() + .withManagementState(Datasciencepipelines.ManagementState.MANAGED) + // https://github.com/opendatahub-io/data-science-pipelines-operator/blob/main/datasciencecluster/datasciencecluster.yaml + .withDevFlags( + new DevFlagsBuilder() + .withManifests( + List.of( + new ManifestsBuilder() + .withUri("https://github.com/opendatahub-io/data-science-pipelines-operator/tarball/main") + .withContextDir("config") + .withSourcePath("overlays/odh") + .build() + ) + ) + .build() + ) + .build() + ) + .withModelmeshserving( + new ModelmeshservingBuilder().withManagementState(Modelmeshserving.ManagementState.MANAGED).build() + ) + .withRay( + new RayBuilder().withManagementState(Ray.ManagementState.MANAGED).build() + ) + .build()) + .endSpec() + .build(); + + ResourceManager.getInstance().createResourceWithWait(dsci); + ResourceManager.getInstance().createResourceWithWait(dsc); + } + + /// ODS-2206 - Verify user can create and run a data science pipeline in DS Project + /// ODS-2226 - Verify user can delete components of data science pipeline from DS Pipelines page + /// https://issues.redhat.com/browse/RHODS-5133 + @TestDoc( + description = @Desc("Check that user can create, run and deleted DataSciencePipeline from a DataScience project"), + contact = @Contact(name = "Jiri Danek", email = "jdanek@redhat.com"), + steps = { + @Step(value = "Create namespace for DataSciencePipelines application with proper name, labels and annotations", expected = "Namespace is created"), + @Step(value = "Create Minio secret with proper data for access s3", expected = "Secret is created"), + @Step(value = "Create DataSciencePipelinesApplication with configuration for new Minio instance and new MariaDB instance", expected = "DataSciencePipelinesApplication resource is created"), + @Step(value = "Wait for DataSciencePipelines server readiness", expected = "DSP API endpoint is available and it return proper data"), + @Step(value = "Import pipeline to a pipeline server via API", expected = "Pipeline is imported"), + @Step(value = "List imported pipeline via API", expected = "Server return list with imported pipeline info"), + @Step(value = "Trigger pipeline run for imported pipeline", expected = "Pipeline is triggered"), + @Step(value = "Wait for pipeline success", expected = "Pipeline succeeded"), + @Step(value = "Delete pipeline run", expected = "Pipeline run is deleted"), + @Step(value = "Delete pipeline", expected = "Pipeline is deleted"), + } + ) + + @Test + void testUserCanCreateRunAndDeleteADSPipelineFromDSProject() throws IOException { + OpenShiftClient ocClient = (OpenShiftClient) client; + + final String pipelineTestName = "pipeline-test-name"; + final String pipelineTestDesc = "pipeline-test-desc"; + final String prjTitle = "pipeline-test"; + final String pipelineTestFilepath = "src/test/resources/pipelines/better_v2.yaml"; + final String pipelineWorkflowName = "iris-training-pipeline"; + final String pipelineTestRunBasename = "pipeline-test-run-basename"; + + final String secretName = "mlpipeline-minio-artifact"; // todo: can't use custom name in v2, bug? + + // create project + Namespace ns = new NamespaceBuilder() + .withNewMetadata() + .withName(prjTitle) + .addToLabels(OdhAnnotationsLabels.LABEL_DASHBOARD, "true") + .addToAnnotations(OdhAnnotationsLabels.ANNO_SERVICE_MESH, "false") + .endMetadata() + .build(); + ResourceManager.getInstance().createResourceWithWait(ns); + + // create minio secret + Secret secret = new SecretBuilder() + .withNewMetadata() + .withName(secretName) + .addToLabels("opendatahub.io/dashboard", "true") + .withNamespace(prjTitle) + .endMetadata() + .addToStringData("AWS_ACCESS_KEY_ID", "KEY007") + .addToStringData("AWS_S3_BUCKET", "HolyGrail") + .addToStringData("AWS_SECRET_ACCESS_KEY", "gimmeAccessPlz") + .withType("Opaque") + .build(); + ResourceManager.getInstance().createResourceWithWait(secret); + + // configure pipeline server (with minio, not AWS bucket) + DataSciencePipelinesApplication dspa = new DataSciencePipelinesApplicationBuilder() + .withNewMetadata() + .withName("pipelines-definition") + .withNamespace(prjTitle) + .endMetadata() + .withNewSpec() + // https://github.com/opendatahub-io/data-science-pipelines-operator/blob/main/config/samples/v2/dspa-simple/dspa_simple.yaml + .withDspVersion("v2") + .withNewMlpipelineUI() + .withImage("quay.io/opendatahub/ds-pipelines-frontend:latest") + .endMlpipelineUI() + // todo: v1 values below, this will need review and updating + .withNewApiServer() + .withApplyTektonCustomResource(true) + .withArchiveLogs(false) + .withAutoUpdatePipelineDefaultVersion(true) + .withCollectMetrics(true) + .withDbConfigConMaxLifetimeSec(120L) + .withDeploy(true) + .withEnableOauth(true) + .withEnableSamplePipeline(false) + .withInjectDefaultScript(true) + .withStripEOF(true) + .withTerminateStatus(ApiServer.TerminateStatus.CANCELLED) + .withTrackArtifacts(true) + .endApiServer() + .withNewDatabase() + .withDisableHealthCheck(false) + .withNewMariaDB() + .withDeploy(true) + .withPipelineDBName("mlpipeline") + .withNewPvcSize("10Gi") + .withUsername("mlpipeline") + .endMariaDB() + .endDatabase() + // todo: 2024-03-04T08:12:51Z INFO Encountered error when parsing CR: + // [MLMD explicitly disabled in DSPA, but is a required component for V2 Pipelines] + // {"namespace": "pipeline-test", "dspa_name": "pipelines-definition"} │ + .withNewMlmd() + .withDeploy(true) + .endMlmd() + .withNewObjectStorage() + .withDisableHealthCheck(false) + // NOTE: ods-ci uses aws, but minio is more appropriate here + // todo: │ Warning Failed 7s (x4 over 44s) kubelet Error: secret "mlpipeline-minio-artifact" not found + .withNewMinio() + .withDeploy(true) + .withImage("quay.io/minio/minio") + .withNewPvcSize("1Gi") + .withBucket("HolyGrail") + .withNewS3CredentialsSecret() + .withAccessKey("AWS_ACCESS_KEY_ID") + .withSecretKey("AWS_SECRET_ACCESS_KEY") + .withSecretName(secretName) + .endMinioS3CredentialsSecret() + .endMinio() + .endObjectStorage() + .withNewPersistenceAgent() + .withDeploy(true) + .withNumWorkers(2L) + .endPersistenceAgent() + .withNewScheduledWorkflow() + .withCronScheduleTimezone("UTC") + .withDeploy(true) + .endScheduledWorkflow() + .endSpec() + .build(); + ResourceManager.getInstance().createResourceWithWait(dspa); + + // wait for pipeline api server to come up + Resource endpoints = client.endpoints().inNamespace(prjTitle).withName("ds-pipeline-pipelines-definition"); + waitForEndpoints(endpoints); + + // connect to the api server we just created, route not available unless I enable oauth + Resource route = ocClient.routes() + .inNamespace(prjTitle).withName("ds-pipeline-pipelines-definition"); + + // TODO(jdanek) I still don't know how to do oauth, so let's forward a port + ServiceResource svc = client.services().inNamespace(prjTitle).withName("ds-pipeline-pipelines-definition"); + try (LocalPortForward portForward = svc.portForward(8888, 0)) { + KFPv2Client kfpClient = new KFPv2Client("http://localhost:%d".formatted(portForward.getLocalPort())); + + // WORKAROUND(RHOAIENG-3250): delete sample pipeline present on ODH + if (Environment.PRODUCT.equals(Environment.PRODUCT_DEFAULT)) { + for (KFPv2Client.Pipeline pipeline : kfpClient.listPipelines()) { + kfpClient.deletePipeline(pipeline.pipelineId); + } + } + + for (KFPv2Client.Pipeline pipeline : kfpClient.listPipelines()) { + for (KFPv2Client.PipelineVersion pipelineVersion : kfpClient.listPipelineVersions(pipeline.pipelineId)) { + kfpClient.deletePipelineVersion(pipeline.pipelineId, pipelineVersion.pipelineVersionId); + } + kfpClient.deletePipeline(pipeline.pipelineId); + } + + KFPv2Client.Pipeline importedPipeline = kfpClient.importPipeline(pipelineTestName, pipelineTestDesc, pipelineTestFilepath); + + List pipelines = kfpClient.listPipelines(); + assertThat(pipelines.stream().map(p -> p.displayName).collect(Collectors.toList()), Matchers.contains(pipelineTestName)); + + Map parameters = Map.of( + "min_max_scaler", false, + "neighbors", 1, + "standard_scaler", true + ); + KFPv2Client.PipelineRun pipelineRun = kfpClient.runPipeline(pipelineTestRunBasename, importedPipeline.pipelineId, parameters, "Immediate"); + + kfpClient.waitForPipelineRun(pipelineRun.runId); + + List statuses = kfpClient.getPipelineRunStatus(); + assertThat(statuses.stream() + .filter(run -> run.runId.equals(pipelineRun.runId)) + .map(run -> run.state) + .findFirst().get(), Matchers.is("SUCCEEDED")); + + // todo: don't know where to get what used to be `pipelineRun.runId.substring(0, 5)` +// checkPipelineRunK8sDeployments(prjTitle, pipelineWorkflowName + "-" + pipelineRun.runId.substring(0, 5)); + + kfpClient.deletePipelineRun(pipelineRun.runId); + for (KFPv2Client.PipelineVersion pipelineVersion : kfpClient.listPipelineVersions(importedPipeline.pipelineId)) { + kfpClient.deletePipelineVersion(importedPipeline.pipelineId, pipelineVersion.pipelineVersionId); + } + kfpClient.deletePipeline(importedPipeline.pipelineId); + } + } + + private void checkPipelineRunK8sDeployments(String prjTitle, String workflowName) { + List argoTaskPods = client.pods().inNamespace(prjTitle).withLabel("workflows.argoproj.io/workflow=" + workflowName).list().getItems(); + Assertions.assertEquals(4, argoTaskPods.size()); + + for (Pod pod : argoTaskPods) { + Assertions.assertEquals("Succeeded", pod.getStatus().getPhase()); + + List containerStatuses = pod.getStatus().getContainerStatuses(); + Assertions.assertNotEquals(0, containerStatuses.size()); + for (ContainerStatus containerStatus : containerStatuses) { + ContainerStateTerminated terminated = containerStatus.getState().getTerminated(); + Assertions.assertNotNull(terminated); + Assertions.assertEquals(0, terminated.getExitCode()); + Assertions.assertEquals("Completed", terminated.getReason()); + } + } + + List expectedNodeNames = List.of( + workflowName + ".root.data-prep-driver", + workflowName + ".root.train-model-driver", + workflowName + ".root.data-prep.executor", + workflowName + ".root-driver"); + List argoNodeNames = argoTaskPods.stream() + .map(pod -> pod.getMetadata().getAnnotations().get("workflows.argoproj.io/node-name")) + .toList(); + Assertions.assertIterableEquals(expectedNodeNames.stream().sorted().toList(), argoNodeNames.stream().sorted().toList(), argoNodeNames.toString()); + } + + private static void waitForEndpoints(Resource endpoints) { + TestUtils.waitFor("pipelines svc to come up", TestConstants.GLOBAL_POLL_INTERVAL_SHORT, TestConstants.GLOBAL_TIMEOUT, () -> { + try { + Endpoints endpointset = endpoints.get(); + if (endpointset == null) { + return false; + } + List subsets = endpointset.getSubsets(); + if (subsets.isEmpty()) { + return false; + } + for (EndpointSubset subset : subsets) { + return !subset.getAddresses().isEmpty(); + } + } catch (KubernetesClientException e) { + if (e.getCode() == 404) { + return false; + } + throw e; + } + return false; + }); + } +} diff --git a/src/test/resources/pipelines/better_v2.yaml b/src/test/resources/pipelines/better_v2.yaml new file mode 100644 index 00000000..b6279890 --- /dev/null +++ b/src/test/resources/pipelines/better_v2.yaml @@ -0,0 +1,227 @@ +# PIPELINE DEFINITION +# Name: iris-training-pipeline +# Inputs: +# min_max_scaler: bool +# neighbors: int +# standard_scaler: bool +components: + comp-create-dataset: + executorLabel: exec-create-dataset + outputDefinitions: + artifacts: + iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-normalize-dataset: + executorLabel: exec-normalize-dataset + inputDefinitions: + artifacts: + input_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + min_max_scaler: + parameterType: BOOLEAN + standard_scaler: + parameterType: BOOLEAN + outputDefinitions: + artifacts: + normalized_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-train-model: + executorLabel: exec-train-model + inputDefinitions: + artifacts: + normalized_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + n_neighbors: + parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-create-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_dataset(iris_dataset: Output[Dataset]):\n import pandas\ + \ as pd\n\n csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'\n\ + \ col_names = [\n 'Sepal_Length', 'Sepal_Width', 'Petal_Length',\ + \ 'Petal_Width', 'Labels'\n ]\n df = pd.read_csv(csv_url, names=col_names)\n\ + \n with open(iris_dataset.path, 'w') as f:\n df.to_csv(f)\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 + exec-normalize-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - normalize_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef normalize_dataset(\n input_iris_dataset: Input[Dataset],\n\ + \ normalized_iris_dataset: Output[Dataset],\n standard_scaler: bool,\n\ + \ min_max_scaler: bool,\n):\n if standard_scaler is min_max_scaler:\n\ + \ raise ValueError(\n 'Exactly one of standard_scaler\ + \ or min_max_scaler must be True.')\n\n import pandas as pd\n from\ + \ sklearn.preprocessing import MinMaxScaler\n from sklearn.preprocessing\ + \ import StandardScaler\n\n with open(input_iris_dataset.path) as f:\n\ + \ df = pd.read_csv(f)\n labels = df.pop('Labels')\n\n if standard_scaler:\n\ + \ scaler = StandardScaler()\n if min_max_scaler:\n scaler\ + \ = MinMaxScaler()\n\n df = pd.DataFrame(scaler.fit_transform(df))\n\ + \ df['Labels'] = labels\n normalized_iris_dataset.metadata['state']\ + \ = \"Normalized\"\n with open(normalized_iris_dataset.path, 'w') as\ + \ f:\n df.to_csv(f)\n\n" + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 + exec-train-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_model(\n normalized_iris_dataset: Input[Dataset],\n\ + \ model: Output[Model],\n n_neighbors: int,\n):\n import pickle\n\ + \n import pandas as pd\n from sklearn.model_selection import train_test_split\n\ + \ from sklearn.neighbors import KNeighborsClassifier\n\n with open(normalized_iris_dataset.path)\ + \ as f:\n df = pd.read_csv(f)\n\n y = df.pop('Labels')\n X\ + \ = df\n\n X_train, X_test, y_train, y_test = train_test_split(X, y,\ + \ random_state=0)\n\n clf = KNeighborsClassifier(n_neighbors=n_neighbors)\n\ + \ clf.fit(X_train, y_train)\n\n model.metadata['framework'] = 'scikit-learn'\n\ + \ with open(model.path, 'wb') as f:\n pickle.dump(clf, f)\n\n" + + image: quay.io/opendatahub/ds-pipelines-sample-base:v1.0 +pipelineInfo: + name: iris-training-pipeline +root: + dag: + tasks: + create-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-dataset + taskInfo: + name: create-dataset + normalize-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-normalize-dataset + dependentTasks: + - create-dataset + inputs: + artifacts: + input_iris_dataset: + taskOutputArtifact: + outputArtifactKey: iris_dataset + producerTask: create-dataset + parameters: + min_max_scaler: + runtimeValue: + constant: false + standard_scaler: + runtimeValue: + constant: true + taskInfo: + name: normalize-dataset + train-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-model + dependentTasks: + - normalize-dataset + inputs: + artifacts: + normalized_iris_dataset: + taskOutputArtifact: + outputArtifactKey: normalized_iris_dataset + producerTask: normalize-dataset + parameters: + n_neighbors: + componentInputParameter: neighbors + taskInfo: + name: train-model + inputDefinitions: + parameters: + min_max_scaler: + parameterType: BOOLEAN + neighbors: + parameterType: NUMBER_INTEGER + standard_scaler: + parameterType: BOOLEAN +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0