Skip to content

Commit

Permalink
[ADH-4134] Use thread local matcher in PathChecker to prevent race co…
Browse files Browse the repository at this point in the history
…ndition of SmartServer RPC server threads checking ignored paths
  • Loading branch information
tigrulya-exe committed Mar 11, 2024
1 parent fd7a1e3 commit 45acb35
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 4 deletions.
10 changes: 6 additions & 4 deletions smart-common/src/main/java/org/smartdata/model/PathChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class PathChecker {
private static final String IGNORED_PATH_TEMPLATES_DELIMITER = ",";

private final Matcher patternMatcher;
private final ThreadLocal<Matcher> patternMatcherThreadLocal;
private final List<String> coverDirs;

public PathChecker(SmartConf configuration) {
Expand All @@ -52,13 +52,15 @@ public PathChecker(List<String> ignoredPathPatterns, List<String> coverDirs) {
ignoredPathPatterns.forEach(patternBuilder::add);

Pattern pattern = Pattern.compile(patternBuilder.toString());
this.patternMatcher = pattern.matcher("");
this.patternMatcherThreadLocal =
ThreadLocal.withInitial(() -> pattern.matcher(""));
this.coverDirs = coverDirs;
}

public boolean isIgnored(String absolutePath) {
patternMatcher.reset(absolutePath);
return patternMatcher.find();
return patternMatcherThreadLocal.get()
.reset(absolutePath)
.find();
}

public boolean isCovered(String absolutePath) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.smartdata.model;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class TestPathCheckerThreadSafety {

private static final int NUM_WORKERS = 10;
private static final int NUM_PATHS = 1000;

// we need to use paths of different lengths to potentially raise
// ArrayIndexOutOfBoundsException in case of wrong multithreading behavior in Matcher
private static final List<String> TEST_PATHS = Arrays.asList(
"short_path",
"/some_dir/medium_size_path",
"/test/dir/another/dir/extraordinary_long_size_path"
);
private static final String IGNORED_DIR = "/ignored/";

private ExecutorService executor;
private PathChecker pathChecker;
private CyclicBarrier startBarrier;
private AtomicInteger ignoredPathCounter;

@Before
public void init() throws Exception {
executor = Executors.newFixedThreadPool(10);
pathChecker = new PathChecker(
Collections.singletonList("/ignored/*"),
Collections.emptyList());
startBarrier = new CyclicBarrier(NUM_WORKERS);
ignoredPathCounter = new AtomicInteger();
}

@After
public void shutdown() {
executor.shutdownNow();
}

@Test
public void checkThreadSafety() throws Exception {
Random random = new Random();

CompletableFuture<?>[] futures = IntStream.range(0, NUM_WORKERS)
.mapToObj(ignore -> startWorker(random))
.toArray(CompletableFuture[]::new);

CompletableFuture.allOf(futures)
.get(2, TimeUnit.SECONDS);
Assert.assertEquals(ignoredPathCounter.get(), NUM_WORKERS);
}

private CompletableFuture<?> startWorker(Random random) {
List<String> paths = Stream.generate(() -> nextPath(random))
.limit(NUM_PATHS)
.collect(Collectors.toList());

int ignoredPathIndex = random.nextInt(NUM_PATHS);
paths.set(ignoredPathIndex, IGNORED_DIR + paths.get(ignoredPathIndex));

return CompletableFuture.runAsync(new Worker(paths), executor);
}

private String nextPath(Random random) {
int nextIndex = random.nextInt(TEST_PATHS.size());
return TEST_PATHS.get(nextIndex);
}

private class Worker implements Runnable {
private final List<String> pathsToCheck;

private Worker(List<String> pathsToCheck) {
this.pathsToCheck = pathsToCheck;
}

@Override
public void run() {
try {
startBarrier.await(1, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Error waiting start barrier", e);
}

for (String path: pathsToCheck) {
if (pathChecker.isIgnored(path)) {
ignoredPathCounter.incrementAndGet();
}
}
}
}
}

0 comments on commit 45acb35

Please sign in to comment.