Skip to content

Commit

Permalink
HADOOP-19164. Hadoop CLI MiniCluster is broken (#7050). Contributed b…
Browse files Browse the repository at this point in the history
…y Ayush Saxena.

Reviewed-by: Vinayakumar B <[email protected]>
  • Loading branch information
ayushtkn authored Sep 21, 2024
1 parent 6bcc254 commit 28538d6
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,16 @@ You should be able to obtain the Hadoop tarball from the release. Also, you can
$ mvn clean install -DskipTests
$ mvn package -Pdist -Dtar -DskipTests -Dmaven.javadoc.skip

**NOTE:** You will need [protoc 2.5.0](http://code.google.com/p/protobuf/) installed.

The tarball should be available in `hadoop-dist/target/` directory.

Running the MiniCluster
-----------------------

From inside the root directory of the extracted tarball, you can start the CLI MiniCluster using the following command:

$ bin/mapred minicluster -rmport RM_PORT -jhsport JHS_PORT
$ bin/mapred minicluster -format

In the example command above, `RM_PORT` and `JHS_PORT` should be replaced by the user's choice of these port numbers. If not specified, random free ports will be used.
The format option is required when running the minicluster for the first time, from next time -format option isn't required.

There are a number of command line arguments that the users can use to control which services to start, and to pass other configuration properties. The available command line arguments:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -201,7 +202,7 @@ public void testServerDefaultsWithCaching()
cluster.waitActive();
// Set a spy namesystem inside the namenode and return it
FSNamesystem spyNamesystem =
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode());
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
try {
// Create a dfs client and set a long enough validity interval
Expand Down Expand Up @@ -252,7 +253,7 @@ public void testServerDefaultsWithMinimalCaching() throws Exception {
cluster.waitActive();
// Set a spy namesystem inside the namenode and return it
FSNamesystem spyNamesystem =
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode());
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
try {
// Create a dfs client and set a minimal validity interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.test.MockitoUtil;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
Expand Down Expand Up @@ -297,7 +297,8 @@ public void testGetBlockLocationsOnlyUsesReadLock() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.build();
ReentrantReadWriteLock spyLock = NameNodeAdapter.spyOnFsLock(cluster.getNamesystem());
ReentrantReadWriteLock spyLock =
NameNodeAdapterMockitoUtil.spyOnFsLock(cluster.getNamesystem());
try {
// Create empty file in the FSN.
Path p = new Path("/empty-file");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -1877,7 +1877,7 @@ public Void run() throws Exception {
}

private void spyFSNamesystem(NameNode nn) throws IOException {
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
FSNamesystem fsnSpy = NameNodeAdapterMockitoUtil.spyOnNamesystem(nn);
doAnswer(new Answer<BlocksWithLocations>() {
@Override
public BlocksWithLocations answer(InvocationOnMock invocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
Expand Down Expand Up @@ -259,7 +259,7 @@ private void testBalancerWithObserver(boolean withObserverFailure)
List<FSNamesystem> namesystemSpies = new ArrayList<>();
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
namesystemSpies.add(
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i)));
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode(i)));
}
if (withObserverFailure) {
// First observer NN is at index 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@

import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
Expand All @@ -47,7 +41,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
Expand All @@ -57,11 +50,6 @@
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.Whitebox;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;

Expand Down Expand Up @@ -269,97 +257,6 @@ public static BlockInfo getStoredBlock(final FSNamesystem fsn,
return fsn.getStoredBlock(b);
}

public static FSNamesystem spyOnNamesystem(NameNode nn) {
FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
FSNamesystem fsnOld = nn.namesystem;
fsnOld.writeLock();
fsnSpy.writeLock();
nn.namesystem = fsnSpy;
try {
FieldUtils.writeDeclaredField(
(NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getBlockManager().getDatanodeManager(),
"namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
"namesystem", fsnSpy, true);
} catch (IllegalAccessException e) {
throw new RuntimeException("Cannot set spy FSNamesystem", e);
} finally {
fsnSpy.writeUnlock();
fsnOld.writeUnlock();
}
return fsnSpy;
}

public static BlockManager spyOnBlockManager(NameNode nn) {
BlockManager bmSpy = Mockito.spy(nn.getNamesystem().getBlockManager());
nn.getNamesystem().setBlockManagerForTesting(bmSpy);
return bmSpy;
}

public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests());
fsn.setFsLockForTests(spy);
return spy;
}

public static FSImage spyOnFsImage(NameNode nn1) {
FSNamesystem fsn = nn1.getNamesystem();
FSImage spy = Mockito.spy(fsn.getFSImage());
Whitebox.setInternalState(fsn, "fsImage", spy);
return spy;
}

public static FSEditLog spyOnEditLog(NameNode nn) {
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
if (tailer != null) {
tailer.setEditLog(spyEditLog);
}
return spyEditLog;
}

/**
* Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
*/
public static FSEditLog spyDelayMkDirTransaction(
final NameNode nn, final long delay) {
FSEditLog realEditLog = nn.getFSImage().getEditLog();
FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
Answer<Boolean> ans = new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(delay);
return (Boolean) invocation.callRealMethod();
}
};
ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() {
@Override
public boolean matches(FSEditLogOp argument) {
FSEditLogOp op = (FSEditLogOp) argument;
return op.opCode == FSEditLogOpCodes.OP_MKDIR;
}
};
doAnswer(ans).when(spyEditLog).doEditTransaction(
ArgumentMatchers.argThat(am));
return spyEditLog;
}

public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = Mockito.spy(editLog.getJournalSet());
editLog.setJournalSetForTesting(js);
return js;
}

public static String getMkdirOpPath(FSEditLogOp op) {
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
return ((MkdirOp) op).path;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.namenode;

import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;

import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.test.Whitebox;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

/**
* This is a Mockito based utility class to expose NameNode functionality for unit tests.
*/
public final class NameNodeAdapterMockitoUtil {

private NameNodeAdapterMockitoUtil() {
}

public static BlockManager spyOnBlockManager(NameNode nn) {
BlockManager bmSpy = spy(nn.getNamesystem().getBlockManager());
nn.getNamesystem().setBlockManagerForTesting(bmSpy);
return bmSpy;
}

public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
ReentrantReadWriteLock spy = spy(fsn.getFsLockForTests());
fsn.setFsLockForTests(spy);
return spy;
}

public static FSImage spyOnFsImage(NameNode nn1) {
FSNamesystem fsn = nn1.getNamesystem();
FSImage spy = spy(fsn.getFSImage());
Whitebox.setInternalState(fsn, "fsImage", spy);
return spy;
}

public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = spy(editLog.getJournalSet());
editLog.setJournalSetForTesting(js);
return js;
}

public static FSNamesystem spyOnNamesystem(NameNode nn) {
FSNamesystem fsnSpy = spy(nn.getNamesystem());
FSNamesystem fsnOld = nn.namesystem;
fsnOld.writeLock();
fsnSpy.writeLock();
nn.namesystem = fsnSpy;
try {
FieldUtils.writeDeclaredField(nn.getRpcServer(), "namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getBlockManager().getDatanodeManager(),
"namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
"namesystem", fsnSpy, true);
} catch (IllegalAccessException e) {
throw new RuntimeException("Cannot set spy FSNamesystem", e);
} finally {
fsnSpy.writeUnlock();
fsnOld.writeUnlock();
}
return fsnSpy;
}

public static FSEditLog spyOnEditLog(NameNode nn) {
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
if (tailer != null) {
tailer.setEditLog(spyEditLog);
}
return spyEditLog;
}

/**
* Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
*/
public static FSEditLog spyDelayMkDirTransaction(
final NameNode nn, final long delay) {
FSEditLog realEditLog = nn.getFSImage().getEditLog();
FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
Answer<Boolean> ans = invocation -> {
Thread.sleep(delay);
return (Boolean) invocation.callRealMethod();
};
ArgumentMatcher<FSEditLogOp> am = argument -> argument.opCode == FSEditLogOpCodes.OP_MKDIR;
doAnswer(ans).when(spyEditLog).doEditTransaction(ArgumentMatchers.argThat(am));
return spyEditLog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.junit.After;
Expand Down Expand Up @@ -336,7 +337,7 @@ public void testFailureToReadEditsOnTransitionToActive() throws Exception {
}

private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
FSEditLog spyEditLog = NameNodeAdapterMockitoUtil.spyOnEditLog(nn1);
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
doAnswer(answer).when(spyEditLog).selectInputStreams(
anyLong(), anyLong(), any(), anyBoolean(), anyBoolean());
Expand Down
Loading

0 comments on commit 28538d6

Please sign in to comment.