Skip to content

Commit

Permalink
Revert "HBASE-26249 Ameliorate compaction made by bul… (apache#3831)"
Browse files Browse the repository at this point in the history
This reverts commit 5e62e2a.
  • Loading branch information
sunhelly committed Nov 23, 2021
1 parent 5e62e2a commit 2aafd31
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
Expand Down Expand Up @@ -94,7 +92,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private volatile ThreadPoolExecutor splits;

private volatile ThroughputController compactionThroughputController;
private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();

private volatile boolean compactionsEnabled;
/**
Expand All @@ -116,15 +113,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
CompactionThroughputControllerFactory.create(server, conf);
}

// only for test
public CompactSplit(Configuration conf) {
this.server = null;
this.conf = conf;
this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
createCompactionExecutors();
createSplitExcecutors();
}

private void createSplitExcecutors() {
final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
Expand Down Expand Up @@ -250,8 +238,7 @@ private void reInitializeCompactionsExecutors() {
createCompactionExecutors();
}

// set protected for test
protected interface CompactionCompleteTracker {
private interface CompactionCompleteTracker {

default void completed(Store store) {
}
Expand Down Expand Up @@ -329,8 +316,7 @@ private void requestCompactionInternal(HRegion region, String why, int priority,
}
}

// set protected for test
protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
if (this.server.isStopped() || (region.getTableDescriptor() != null &&
Expand Down Expand Up @@ -378,12 +364,6 @@ protected void requestCompactionInternal(HRegion region, HStore store, String wh
}
pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
if (LOG.isDebugEnabled()) {
LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}", getStoreNameForUnderCompaction(store), priority,
underCompactionStores.size());
}
underCompactionStores.add(getStoreNameForUnderCompaction(store));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
Expand All @@ -397,18 +377,8 @@ public synchronized void requestSystemCompaction(HRegion region, String why) thr
DUMMY_COMPLETE_TRACKER, null);
}

public void requestSystemCompaction(HRegion region, HStore store, String why)
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
throws IOException {
requestSystemCompaction(region, store, why, false);
}

public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
boolean giveUpIfRequestedOrCompacting) throws IOException {
if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
store.getColumnFamilyName());
return;
}
requestCompactionInternal(region, store, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
}
Expand Down Expand Up @@ -501,13 +471,6 @@ public int getRegionSplitLimit() {
return this.regionSplitLimit;
}

/**
* Check if this store is under compaction
*/
public boolean isUnderCompaction(final HStore s) {
return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
}

private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() {

Expand Down Expand Up @@ -687,22 +650,13 @@ private void doCompaction(User user) {

@Override
public void run() {
try {
Preconditions.checkNotNull(server);
if (server.isStopped() || (region.getTableDescriptor() != null &&
!region.getTableDescriptor().isCompactionEnabled())) {
region.decrementCompactionsQueuedCount();
return;
}
doCompaction(user);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove under compaction mark for store: {}",
store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
.getColumnFamilyName());
}
underCompactionStores.remove(getStoreNameForUnderCompaction(store));
Preconditions.checkNotNull(server);
if (server.isStopped() || (region.getTableDescriptor() != null &&
!region.getTableDescriptor().isCompactionEnabled())) {
region.decrementCompactionsQueuedCount();
return;
}
doCompaction(user);
}

private String formatStackTrace(Exception ex) {
Expand Down Expand Up @@ -870,10 +824,4 @@ ThreadPoolExecutor getShortCompactions() {
return shortCompactions;
}

private String getStoreNameForUnderCompaction(HStore store) {
return String.format("%s:%s",
store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
store.getColumnFamilyName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import java.io.EOFException;
Expand Down Expand Up @@ -69,7 +68,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -179,7 +177,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -7053,10 +7050,11 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
"bulkload hfiles request compaction", true);
LOG.info("Request compaction for region {} family {} after bulk load",
this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
CompactionLifeCycleTracker.DUMMY, null);
LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
this.getRegionInfo(), family);
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ void requestCompaction(HRegion region, String why, int priority,
void requestCompaction(HRegion region, HStore store, String why, int priority,
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;

/**
* Request system compaction on the given store.
*/
void requestSystemCompaction(HRegion region, HStore store, String why,
boolean giveUpIfRequestedOrCompacting) throws IOException;

/**
* on/off compaction
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,49 @@
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@Category(SmallTests.class)
public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
private final CompactionRequester compactionRequester = mock(CompactSplit.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);

private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
public static AtomicInteger called = new AtomicInteger(0);

@Override
protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families) throws IOException {
Expand All @@ -79,9 +79,7 @@ protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableNam
}

@Test
public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
called.set(0);
public void shouldRequestCompactionAfterBulkLoad() throws IOException {
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
Expand All @@ -90,7 +88,7 @@ public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
Expand All @@ -105,77 +103,14 @@ public Object answer(InvocationOnMock invocation) {
}
});

HRegion region = testRegionWithFamilies(family1, family2, family3);
region.bulkLoadHFiles(familyPaths, false, null);
assertEquals(3, called.get());
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}

@Test
public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
called.set(0);
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
}
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
});

HRegion region = testRegionWithFamilies(family1, family2, family3);
region.bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 2 families
assertEquals(2, called.get());
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}

private class TestCompactSplit extends CompactSplit {

TestCompactSplit(Configuration conf) {
super(conf);
}

@Override
protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
called.addAndGet(1);
}
}

private class TestFamily1UnderCompact extends TestCompactSplit {

TestFamily1UnderCompact(Configuration conf) {
super(conf);
}

@Override
public boolean isUnderCompaction(final HStore s) {
if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
return true;
}
return super.isUnderCompaction(s);
}
}

}

0 comments on commit 2aafd31

Please sign in to comment.