Skip to content

Commit

Permalink
added some unit tests for gobblin temporal module
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Pratap Singh committed Sep 24, 2024
1 parent 72306bb commit 3950d10
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 8 deletions.
5 changes: 4 additions & 1 deletion gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ dependencies {
testCompile project(":gobblin-example")

testCompile externalDependency.testng
testCompile externalDependency.mockito
testCompile externalDependency.mockitoInline
testCompile externalDependency.powerMockApi
testCompile externalDependency.powerMockModule
testCompile externalDependency.hadoopYarnMiniCluster
testCompile externalDependency.curatorFramework
testCompile externalDependency.curatorTest


testCompile ('com.google.inject:guice:3.0') {
force = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;

import com.google.common.annotations.VisibleForTesting;


import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;

Expand Down Expand Up @@ -115,8 +118,9 @@ protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final WorkflowAddr
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}

@VisibleForTesting
/** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */
protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
public Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
// (only pause when an appreciable number of leaves)
// TODO: use a configuration value, for simpler adjustment, rather than hard-code
return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
Expand All @@ -130,11 +134,9 @@ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChi
* List<Integer> naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren);
* @return each sub-tree's desired size, in ascending sub-tree order
*/
protected static List<Integer> consolidateSubTreeGrandChildren(
final int numSubTreesPerSubTree,
final int numChildrenTotal,
final int numSubTreeChildren
) {
@VisibleForTesting
public static List<Integer> consolidateSubTreeGrandChildren(final int numSubTreesPerSubTree,
final int numChildrenTotal, final int numSubTreeChildren) {
if (numSubTreesPerSubTree <= 0) {
return Lists.newArrayList();
} else if (isSqrt(numSubTreeChildren, numChildrenTotal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
Set<String> output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
Assert.assertEquals(output.size(), 11);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.temporal.ddm.utils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;

import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;


public class JobStateUtilTest {

private JobState jobState;
private FileSystem fileSystem;

@BeforeMethod
public void setUp() {
jobState = Mockito.mock(JobState.class);
fileSystem = Mockito.mock(FileSystem.class);
}

@Test
public void testOpenFileSystem() throws IOException {

Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
Mockito.when(jobState.getProperties()).thenReturn(new Properties());

FileSystem fs = JobStateUtils.openFileSystem(jobState);

Assert.assertNotNull(fs);
Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString());
}

@Test
public void testCreateSource() throws ReflectiveOperationException {
Mockito.when(jobState.getProp(Mockito.anyString()))
.thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource");
Source<?, ?> source = JobStateUtils.createSource(jobState);
Assert.assertNotNull(source);
}

@Test
public void testOpenTaskStateStoreUncached() throws URISyntaxException {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test");
Mockito.when(jobState.getJobId()).thenReturn("testJobId");
Mockito.when(jobState.getJobName()).thenReturn("testJobName");
Mockito.when(fileSystem.makeQualified(Mockito.any()))
.thenReturn(new Path("file:///test/testJobName/testJobId/output"));
Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output"));

StateStore<TaskState> stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem);

Assert.assertNotNull(stateStore);
}

@Test
public void testGetFileSystemUri() {
Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
URI fsUri = JobStateUtils.getFileSystemUri(jobState);
Assert.assertEquals(URI.create("file:///test"), fsUri);
Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString());
}

@Test
public void testGetWorkDirRoot() {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path rootPath = JobStateUtils.getWorkDirRoot(jobState);
Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath);
Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString());
}

@Test
public void testGetWorkUnitsPath() {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState);
Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath);
}

@Test
public void testGetTaskStateStorePath() throws IOException {
Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path"));
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem);
Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath);
}

@Test
public void testWriteJobState() throws IOException {
Path workDirRootPath = new Path("/tmp");
FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class);
Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos);

JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem);

Mockito.verify(fileSystem).create(Mockito.any(Path.class));
Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean());
}

@Test
public void testGetSharedResourcesBroker() {
Mockito.when(jobState.getProperties()).thenReturn(System.getProperties());
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.workflow.impl;

import java.time.Duration;
import java.util.List;
import java.util.Optional;

import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;

import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;


@RunWith(PowerMockRunner.class)
@PrepareForTest(Workflow.class)
public class AbstractNestingExecWorkflowImplTest {

@Mock
private Workload<String> mockWorkload;

@Mock
private WorkflowAddr mockWorkflowAddr;

@Mock
private Workload.WorkSpan<String> mockWorkSpan;

@Mock
private Promise<Object> mockPromise;

private AbstractNestingExecWorkflowImpl<String, Object> workflow;

@BeforeClass
public void setup() {
// PowerMockito is required to mock static methods in the Workflow class
Mockito.mockStatic(Workflow.class);
Mockito.mockStatic(Async.class);
Mockito.mockStatic(Promise.class);
this.mockWorkload = Mockito.mock(Workload.class);
this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class);
this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class);
this.mockPromise = Mockito.mock(Promise.class);

workflow = new AbstractNestingExecWorkflowImpl<String, Object>() {
@Override
protected Promise<Object> launchAsyncActivity(String task) {
return mockPromise;
}
};
}

@Test
public void testPerformWorkload_NoWorkSpan() {
// Arrange
Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty());

// Act
int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty());

// Assert
Assert.assertEquals(0, result);
Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5);
}

@Test
public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() {
// Act
Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50);

// Assert
Assert.assertEquals(Duration.ZERO, result);
}

@Test
public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() {
// Act
Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150);

// Assert
Assert.assertEquals(
Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT),
result);
}

@Test
public void testConsolidateSubTreeGrandChildren() {
// Act
List<Integer> result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2);

// Assert
Assert.assertEquals(3, result.size());
Assert.assertEquals(Integer.valueOf(0), result.get(0));
Assert.assertEquals(Integer.valueOf(0), result.get(1));
Assert.assertEquals(Integer.valueOf(6), result.get(2));
}

@Test(expectedExceptions = AssertionError.class)
public void testPerformWorkload_LaunchesChildWorkflows() {
// Arrange
Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan));
Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5);
Mockito.when(mockWorkSpan.next()).thenReturn("task1");
Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false);

// Mock the child workflow
Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise);
Mockito.when(mockPromise.get()).thenReturn(5);
// Act
int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty());
}
}

0 comments on commit 3950d10

Please sign in to comment.