diff --git a/smart-common/src/main/java/org/smartdata/model/PathChecker.java b/smart-common/src/main/java/org/smartdata/model/PathChecker.java index 2129df744c..6fe52e3088 100644 --- a/smart-common/src/main/java/org/smartdata/model/PathChecker.java +++ b/smart-common/src/main/java/org/smartdata/model/PathChecker.java @@ -40,7 +40,7 @@ public class PathChecker { private static final String IGNORED_PATH_TEMPLATES_DELIMITER = ","; - private final Matcher patternMatcher; + private final ThreadLocal patternMatcherThreadLocal; private final List coverDirs; public PathChecker(SmartConf configuration) { @@ -52,13 +52,15 @@ public PathChecker(List ignoredPathPatterns, List coverDirs) { ignoredPathPatterns.forEach(patternBuilder::add); Pattern pattern = Pattern.compile(patternBuilder.toString()); - this.patternMatcher = pattern.matcher(""); + this.patternMatcherThreadLocal = + ThreadLocal.withInitial(() -> pattern.matcher("")); this.coverDirs = coverDirs; } public boolean isIgnored(String absolutePath) { - patternMatcher.reset(absolutePath); - return patternMatcher.find(); + return patternMatcherThreadLocal.get() + .reset(absolutePath) + .find(); } public boolean isCovered(String absolutePath) { diff --git a/smart-common/src/test/java/org/smartdata/model/TestPathCheckerThreadSafety.java b/smart-common/src/test/java/org/smartdata/model/TestPathCheckerThreadSafety.java new file mode 100644 index 0000000000..9e29bf77fc --- /dev/null +++ b/smart-common/src/test/java/org/smartdata/model/TestPathCheckerThreadSafety.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.smartdata.model; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +public class TestPathCheckerThreadSafety { + + private static final int NUM_WORKERS = 10; + private static final int NUM_PATHS = 1000; + + // we need to use paths of different lengths to potentially raise + // ArrayIndexOutOfBoundsException in case of wrong multithreading behavior in Matcher + private static final List TEST_PATHS = Arrays.asList( + "short_path", + "/some_dir/medium_size_path", + "/test/dir/another/dir/extraordinary_long_size_path" + ); + private static final String IGNORED_DIR = "/ignored/"; + + private ExecutorService executor; + private PathChecker pathChecker; + private CyclicBarrier startBarrier; + private AtomicInteger ignoredPathCounter; + + @Before + public void init() throws Exception { + executor = Executors.newFixedThreadPool(10); + pathChecker = new PathChecker( + Collections.singletonList("/ignored/*"), + Collections.emptyList()); + startBarrier = new CyclicBarrier(NUM_WORKERS); + ignoredPathCounter = new AtomicInteger(); + } + + @After + public void shutdown() { + executor.shutdownNow(); + } + + @Test + public void checkThreadSafety() throws Exception { + Random random = new Random(); + + CompletableFuture[] futures = IntStream.range(0, NUM_WORKERS) + .mapToObj(ignore -> startWorker(random)) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(futures) + .get(2, TimeUnit.SECONDS); + Assert.assertEquals(ignoredPathCounter.get(), NUM_WORKERS); + } + + private CompletableFuture startWorker(Random random) { + List paths = Stream.generate(() -> nextPath(random)) + .limit(NUM_PATHS) + .collect(Collectors.toList()); + + int ignoredPathIndex = random.nextInt(NUM_PATHS); + paths.set(ignoredPathIndex, IGNORED_DIR + paths.get(ignoredPathIndex)); + + return CompletableFuture.runAsync(new Worker(paths), executor); + } + + private String nextPath(Random random) { + int nextIndex = random.nextInt(TEST_PATHS.size()); + return TEST_PATHS.get(nextIndex); + } + + private class Worker implements Runnable { + private final List pathsToCheck; + + private Worker(List pathsToCheck) { + this.pathsToCheck = pathsToCheck; + } + + @Override + public void run() { + try { + startBarrier.await(1, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Error waiting start barrier", e); + } + + for (String path: pathsToCheck) { + if (pathChecker.isIgnored(path)) { + ignoredPathCounter.incrementAndGet(); + } + } + } + } +}