From 9b4ed96e2a63cc7af68ea85bac51c0df99907fbb Mon Sep 17 00:00:00 2001 From: guqing <38999863+guqing@users.noreply.github.com> Date: Thu, 3 Nov 2022 14:32:19 +0800 Subject: [PATCH] fix: cannot stop thread when system is interrupted (#2639) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### What type of PR is this? /kind bug /kind improvement /area core /milestone 2.0 #### What this PR does / why we need it: 修复 Halo 异常停止时日志服务线程无法中断的问题 #### Special notes for your reviewer: how to test it? 使用 mysql 启动 halo 但不启动 mysql,此时 halo 会无法链接 mysql 期望现象:Halo 服务终止,异常现象:Halo 抛异常但 Netty 服务器不会停止 /cc @halo-dev/sig-halo #### Does this PR introduce a user-facing change? ```release-note 修复 Halo 异常停止时日志服务线程无法中断的问题 ``` --- .../halo/app/metrics/CounterMeterHandler.java | 25 +++- .../run/halo/app/metrics/VisitLogWriter.java | 124 +++++++++--------- .../halo/app/metrics/VisitLogWriterTest.java | 53 ++++++++ 3 files changed, 136 insertions(+), 66 deletions(-) create mode 100644 src/test/java/run/halo/app/metrics/VisitLogWriterTest.java diff --git a/src/main/java/run/halo/app/metrics/CounterMeterHandler.java b/src/main/java/run/halo/app/metrics/CounterMeterHandler.java index 16ed6baacd..c93463b36d 100644 --- a/src/main/java/run/halo/app/metrics/CounterMeterHandler.java +++ b/src/main/java/run/halo/app/metrics/CounterMeterHandler.java @@ -9,8 +9,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.SmartLifecycle; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -29,8 +29,8 @@ */ @Slf4j @Component -public class CounterMeterHandler implements DisposableBean { - +public class CounterMeterHandler implements SmartLifecycle { + private volatile boolean started = false; private final ReactiveExtensionClient client; private final MeterRegistry meterRegistry; @@ -135,8 +135,23 @@ static Counter emptyCounter(String name) { } @Override - public void destroy() { + public void start() { + this.started = true; + } + + @Override + public void stop() { log.debug("Persist counter meters to database before destroy..."); - save().block(); + try { + save().block(); + } catch (Exception e) { + log.error("Persist counter meters to database failed.", e); + } + this.started = false; + } + + @Override + public boolean isRunning() { + return started; } } diff --git a/src/main/java/run/halo/app/metrics/VisitLogWriter.java b/src/main/java/run/halo/app/metrics/VisitLogWriter.java index f25ec84556..7d91e67049 100644 --- a/src/main/java/run/halo/app/metrics/VisitLogWriter.java +++ b/src/main/java/run/halo/app/metrics/VisitLogWriter.java @@ -7,16 +7,17 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.time.Instant; -import java.util.ArrayDeque; -import java.util.Deque; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import reactor.core.Disposable; import run.halo.app.infra.properties.HaloProperties; import run.halo.app.infra.utils.FileUtils; @@ -33,10 +34,12 @@ public class VisitLogWriter implements InitializingBean, DisposableBean { private static final String LOG_FILE_LOCATION = "logs"; private final AsyncLogWriter asyncLogWriter; private volatile boolean interruptThread = false; + private volatile boolean started = false; + private final ExecutorService executorService; private final Path logFilePath; - private VisitLogWriter(HaloProperties haloProperties) throws IOException { + public VisitLogWriter(HaloProperties haloProperties) throws IOException { Path logsPath = haloProperties.getWorkDir() .resolve(LOG_FILE_LOCATION); if (!Files.exists(logsPath)) { @@ -44,10 +47,15 @@ private VisitLogWriter(HaloProperties haloProperties) throws IOException { } this.logFilePath = logsPath.resolve(LOG_FILE_NAME); this.asyncLogWriter = new AsyncLogWriter(logFilePath); + this.executorService = Executors.newFixedThreadPool(1); } - public synchronized void log(String logMsg) { - asyncLogWriter.put(logMsg); + public void log(String logMsg) { + try { + asyncLogWriter.put(logMsg); + } catch (InterruptedException e) { + log.error("Failed to log visit log: {}", ExceptionUtils.getStackTrace(e)); + } } public Path getLogFilePath() { @@ -55,13 +63,22 @@ public Path getLogFilePath() { } void start() { + if (started) { + return; + } log.debug("Starting write visit log..."); - Thread thread = new Thread(() -> { - while (!interruptThread) { - asyncLogWriter.writeLog(); + this.started = true; + executorService.submit(() -> { + while (!interruptThread && !Thread.currentThread().isInterrupted()) { + try { + asyncLogWriter.writeLog(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("VisitLogWrite thread [{}] interrupted", + Thread.currentThread().getName()); + } } - }, "visits-log-writer"); - thread.start(); + }); } @Override @@ -69,21 +86,29 @@ public void afterPropertiesSet() throws Exception { start(); } + public boolean isStarted() { + return started; + } + + public long queuedSize() { + return asyncLogWriter.logQueue.size(); + } + @Override public void destroy() throws Exception { - asyncLogWriter.close(); + this.started = false; interruptThread = true; + asyncLogWriter.dispose(); + executorService.shutdown(); } - static class AsyncLogWriter { + static class AsyncLogWriter implements Disposable { private static final int MAX_LOG_SIZE = 10000; private static final int BATCH_SIZE = 10; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition fullCondition = lock.newCondition(); - private final Condition emptyCondition = lock.newCondition(); private final BufferedOutputStream writer; - private final Deque logQueue; + private final Queue logQueue; private final AtomicInteger logBatch = new AtomicInteger(0); + private volatile boolean disposed = false; public AsyncLogWriter(Path logFilePath) { OutputStream outputStream; @@ -95,33 +120,20 @@ public AsyncLogWriter(Path logFilePath) { throw new RuntimeException(e); } this.writer = new BufferedOutputStream(outputStream); - this.logQueue = new ArrayDeque<>(); + this.logQueue = new LinkedBlockingDeque<>(MAX_LOG_SIZE); } - public void writeLog() { - lock.lock(); - try { - // queue is empty, wait for new log - while (logQueue.isEmpty()) { - try { - emptyCondition.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - String logMessage = logQueue.poll(); - writeToDisk(logMessage); - log.debug("Consumption visit log message: [{}]", logMessage); - // signal log producer - fullCondition.signal(); - } finally { - lock.unlock(); + public void writeLog() throws InterruptedException { + if (logQueue.isEmpty()) { + return; } + String logMessage = logQueue.poll(); + writeToDisk(logMessage); + log.debug("Consumption visit log message: [{}]", logMessage); } - void writeToDisk(String logMsg) { + void writeToDisk(String logMsg) throws InterruptedException { String format = String.format("%s %s\n", Instant.now(), logMsg); - lock.lock(); try { writer.write(format.getBytes(), 0, format.length()); int size = logBatch.incrementAndGet(); @@ -131,33 +143,18 @@ void writeToDisk(String logMsg) { } } catch (IOException e) { log.warn("Record access log failure: ", ExceptionUtils.getRootCause(e)); - } finally { - lock.unlock(); } } - public void put(String logMessage) { - lock.lock(); - try { - while (logQueue.size() == MAX_LOG_SIZE) { - try { - log.debug("Queue full, producer thread waiting..."); - fullCondition.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - // add log message to queue tail - logQueue.add(logMessage); - log.info("Production a log messages [{}]", logMessage); - // signal consumer thread - emptyCondition.signal(); - } finally { - lock.unlock(); - } + public void put(String logMessage) throws InterruptedException { + // add log message to queue tail + logQueue.add(logMessage); + log.info("Production a log messages [{}]", logMessage); } - public void close() { + @Override + public void dispose() { + this.disposed = true; if (writer != null) { try { writer.flush(); @@ -167,5 +164,10 @@ public void close() { FileUtils.closeQuietly(writer); } } + + @Override + public boolean isDisposed() { + return this.disposed; + } } } diff --git a/src/test/java/run/halo/app/metrics/VisitLogWriterTest.java b/src/test/java/run/halo/app/metrics/VisitLogWriterTest.java new file mode 100644 index 0000000000..550359d783 --- /dev/null +++ b/src/test/java/run/halo/app/metrics/VisitLogWriterTest.java @@ -0,0 +1,53 @@ +package run.halo.app.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.util.FileSystemUtils; +import run.halo.app.infra.properties.HaloProperties; + +/** + * Tests for {@link VisitLogWriter}. + * + * @author guqing + * @since 2.0.0 + */ +@ExtendWith(MockitoExtension.class) +class VisitLogWriterTest { + + @Mock + private HaloProperties haloProperties; + + private VisitLogWriter visitLogWriter; + + private Path workDir; + + @BeforeEach + void setUp() throws IOException { + workDir = Files.createTempDirectory("halo-visitlog"); + when(haloProperties.getWorkDir()).thenReturn(workDir); + visitLogWriter = new VisitLogWriter(haloProperties); + } + + @AfterEach + void tearDown() throws Exception { + visitLogWriter.destroy(); + FileSystemUtils.deleteRecursively(workDir); + } + + @Test + void start() { + assertThat(visitLogWriter.isStarted()).isFalse(); + visitLogWriter.start(); + assertThat(visitLogWriter.isStarted()).isTrue(); + } +} \ No newline at end of file