Skip to content

Commit

Permalink
Enable pipeline packages with multiple files (#939)
Browse files Browse the repository at this point in the history
* Enable pipeline packages with multiple files

* Added tests

* Initialize the variables to nil

* Trying to read the archive file entry immediately

* Fixed the pipeline packages used by the `TestPipelineAPI` test.
Also added a failing test case. Will disable it in next commit.

* Disabling the test for the UploadFile bug I've discovered

* Fixed the pipeline name.

* Removed the disabled extra test.

* Addressed the feedback.

* Removed the "header == nil" check (feedback).

* Fixed typo

* Addressed the PR feedback
Added space before comment.
Checking for the error again.
  • Loading branch information
Ark-kun authored and k8s-ci-robot committed Mar 28, 2019
1 parent 825f64d commit 2c2445d
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 11 deletions.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: whalesay
inputs:
- name: param1
- name: param2
implementation:
container:
image: docker/whalesay:latest
command: [cowsay]
args: [{inputValue: param1}, {inputValue: param2}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: arguments-parameters-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: param1
value: hello
- name: param2

templates:
- name: whalesay
inputs:
parameters:
- name: param1
- name: param2
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.param1}}-{{inputs.parameters.param2}}"]
Binary file not shown.
Binary file not shown.
50 changes: 45 additions & 5 deletions backend/src/apiserver/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func isYamlFile(fileName string) bool {
return strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml")
}

func isPipelineYamlFile(fileName string) bool {
return fileName == "pipeline.yaml"
}

func isZipFile(compressedFile []byte) bool {
return len(compressedFile) > 2 && compressedFile[0] == '\x50' && compressedFile[1] == '\x4B' //Signature of zip file is "PK"
}
Expand All @@ -75,13 +79,38 @@ func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
// New behavior: searching for the "pipeline.yaml" file.
tarReader := tar.NewReader(gzipReader)
for {
header, err := tarReader.Next()
if err == io.EOF {
tarReader = nil
break
}
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
if isPipelineYamlFile(header.Name) {
//Found the pipeline file.
break
}
}
// Old behavior - taking the first file in the archive
if tarReader == nil {
// Resetting the reader
gzipReader, err = gzip.NewReader(bytes.NewReader(compressedFile))
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
tarReader = tar.NewReader(gzipReader)
}

header, err := tarReader.Next()
if err != nil || header == nil {
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
if !isYamlFile(header.Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the tarball file. Expecting a YAML file inside the tarball. Got: %v", header.Name)
return nil, util.NewInvalidInputError("Error extracting pipeline from the tarball file. Expecting a pipeline.yaml file inside the tarball. Got: %v", header.Name)
}
decompressedFile, err := ioutil.ReadAll(tarReader)
if err != nil {
Expand All @@ -98,10 +127,21 @@ func DecompressPipelineZip(compressedFile []byte) ([]byte, error) {
if len(reader.File) < 1 {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Empty zip file.")
}
if !isYamlFile(reader.File[0].Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a YAML file inside the zip. Got: %v", reader.File[0].Name)

// Old behavior - taking the first file in the archive
pipelineYamlFile := reader.File[0]
// New behavior: searching for the "pipeline.yaml" file.
for _, file := range reader.File {
if isPipelineYamlFile(file.Name) {
pipelineYamlFile = file
break
}
}

if !isYamlFile(pipelineYamlFile.Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a pipeline.yaml file inside the zip. Got: %v", pipelineYamlFile.Name)
}
rc, err := reader.File[0].Open()
rc, err := pipelineYamlFile.Open()
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Failed to read the content.")
}
Expand Down
18 changes: 18 additions & 0 deletions backend/src/apiserver/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func TestReadPipelineFile_Zip_AnyExtension(t *testing.T) {
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_MultifileZip(t *testing.T) {
file, _ := os.Open("test/pipeline_plus_component/pipeline_plus_component.zip")
pipelineFile, err := ReadPipelineFile("pipeline_plus_component.ai-hub-package", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/pipeline_plus_component/pipeline.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_Tarball(t *testing.T) {
file, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
pipelineFile, err := ReadPipelineFile("arguments.tar.gz", file, MaxFileLength)
Expand All @@ -166,6 +175,15 @@ func TestReadPipelineFile_Tarball_AnyExtension(t *testing.T) {
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_MultifileTarball(t *testing.T) {
file, _ := os.Open("test/pipeline_plus_component/pipeline_plus_component.tar.gz")
pipelineFile, err := ReadPipelineFile("pipeline_plus_component.ai-hub-package", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/pipeline_plus_component/pipeline.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_UnknownFileFormat(t *testing.T) {
file, _ := os.Open("test/unknown_format.foo")
_, err := ReadPipelineFile("unknown_format.foo", file, MaxFileLength)
Expand Down
12 changes: 6 additions & 6 deletions backend/test/integration/pipeline_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ func (s *PipelineApiTest) TestPipelineAPI() {
/* ---------- Upload pipelines zip ---------- */
time.Sleep(1 * time.Second)
argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile(
"../resources/zip-arguments.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
"../resources/arguments.pipeline.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
assert.Nil(t, err)
assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.Name)

/* ---------- Import pipeline tarball by URL ---------- */
time.Sleep(1 * time.Second)
argumentUrlPipeline, err := s.pipelineClient.Create(&params.CreatePipelineParams{
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.zip"}}})
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.pipeline.zip"}}})
assert.Nil(t, err)
assert.Equal(t, "arguments.zip", argumentUrlPipeline.Name)
assert.Equal(t, "arguments.pipeline.zip", argumentUrlPipeline.Name)

/* ---------- Verify list pipeline works ---------- */
pipelines, totalSize, _, err := s.pipelineClient.List(params.NewListPipelinesParams())
Expand All @@ -111,7 +111,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listFirstPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listFirstPagePipelines[1].Name)
assert.Equal(t, "arguments.pipeline.zip", listFirstPagePipelines[1].Name)
assert.NotEmpty(t, nextPageToken)

listSecondPagePipelines, totalSize, nextPageToken, err := s.pipelineClient.List(
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "zip-arguments-parameters", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[1].Name)
assert.Equal(t, "arguments.pipeline.zip", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

/* ---------- List pipelines sort by unsupported description field. Should fail. ---------- */
Expand All @@ -162,7 +162,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.pipeline.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments-parameters.yaml", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

Expand Down
File renamed without changes.
Binary file added backend/test/resources/arguments.pipeline.zip
Binary file not shown.

0 comments on commit 2c2445d

Please sign in to comment.