Skip to content

Commit

Permalink
fix: cannot stop thread when system is interrupted (#2639)
Browse files Browse the repository at this point in the history
#### What type of PR is this?
/kind bug
/kind improvement
/area core
/milestone 2.0

#### What this PR does / why we need it:
修复 Halo 异常停止时日志服务线程无法中断的问题

#### Special notes for your reviewer:
how to test it?
使用 mysql 启动 halo 但不启动 mysql,此时 halo 会无法链接 mysql
期望现象:Halo 服务终止,异常现象:Halo 抛异常但 Netty 服务器不会停止

/cc @halo-dev/sig-halo 
#### Does this PR introduce a user-facing change?

```release-note
修复 Halo 异常停止时日志服务线程无法中断的问题
```
  • Loading branch information
guqing authored Nov 3, 2022
1 parent 73df5e4 commit 9b4ed96
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 66 deletions.
25 changes: 20 additions & 5 deletions src/main/java/run/halo/app/metrics/CounterMeterHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
Expand All @@ -29,8 +29,8 @@
*/
@Slf4j
@Component
public class CounterMeterHandler implements DisposableBean {

public class CounterMeterHandler implements SmartLifecycle {
private volatile boolean started = false;
private final ReactiveExtensionClient client;
private final MeterRegistry meterRegistry;

Expand Down Expand Up @@ -135,8 +135,23 @@ static Counter emptyCounter(String name) {
}

@Override
public void destroy() {
public void start() {
this.started = true;
}

@Override
public void stop() {
log.debug("Persist counter meters to database before destroy...");
save().block();
try {
save().block();
} catch (Exception e) {
log.error("Persist counter meters to database failed.", e);
}
this.started = false;
}

@Override
public boolean isRunning() {
return started;
}
}
124 changes: 63 additions & 61 deletions src/main/java/run/halo/app/metrics/VisitLogWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import run.halo.app.infra.properties.HaloProperties;
import run.halo.app.infra.utils.FileUtils;

Expand All @@ -33,57 +34,81 @@ public class VisitLogWriter implements InitializingBean, DisposableBean {
private static final String LOG_FILE_LOCATION = "logs";
private final AsyncLogWriter asyncLogWriter;
private volatile boolean interruptThread = false;
private volatile boolean started = false;
private final ExecutorService executorService;

private final Path logFilePath;

private VisitLogWriter(HaloProperties haloProperties) throws IOException {
public VisitLogWriter(HaloProperties haloProperties) throws IOException {
Path logsPath = haloProperties.getWorkDir()
.resolve(LOG_FILE_LOCATION);
if (!Files.exists(logsPath)) {
Files.createDirectories(logsPath);
}
this.logFilePath = logsPath.resolve(LOG_FILE_NAME);
this.asyncLogWriter = new AsyncLogWriter(logFilePath);
this.executorService = Executors.newFixedThreadPool(1);
}

public synchronized void log(String logMsg) {
asyncLogWriter.put(logMsg);
public void log(String logMsg) {
try {
asyncLogWriter.put(logMsg);
} catch (InterruptedException e) {
log.error("Failed to log visit log: {}", ExceptionUtils.getStackTrace(e));
}
}

public Path getLogFilePath() {
return logFilePath;
}

void start() {
if (started) {
return;
}
log.debug("Starting write visit log...");
Thread thread = new Thread(() -> {
while (!interruptThread) {
asyncLogWriter.writeLog();
this.started = true;
executorService.submit(() -> {
while (!interruptThread && !Thread.currentThread().isInterrupted()) {
try {
asyncLogWriter.writeLog();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("VisitLogWrite thread [{}] interrupted",
Thread.currentThread().getName());
}
}
}, "visits-log-writer");
thread.start();
});
}

@Override
public void afterPropertiesSet() throws Exception {
start();
}

public boolean isStarted() {
return started;
}

public long queuedSize() {
return asyncLogWriter.logQueue.size();
}

@Override
public void destroy() throws Exception {
asyncLogWriter.close();
this.started = false;
interruptThread = true;
asyncLogWriter.dispose();
executorService.shutdown();
}

static class AsyncLogWriter {
static class AsyncLogWriter implements Disposable {
private static final int MAX_LOG_SIZE = 10000;
private static final int BATCH_SIZE = 10;
private final ReentrantLock lock = new ReentrantLock();
private final Condition fullCondition = lock.newCondition();
private final Condition emptyCondition = lock.newCondition();
private final BufferedOutputStream writer;
private final Deque<String> logQueue;
private final Queue<String> logQueue;
private final AtomicInteger logBatch = new AtomicInteger(0);
private volatile boolean disposed = false;

public AsyncLogWriter(Path logFilePath) {
OutputStream outputStream;
Expand All @@ -95,33 +120,20 @@ public AsyncLogWriter(Path logFilePath) {
throw new RuntimeException(e);
}
this.writer = new BufferedOutputStream(outputStream);
this.logQueue = new ArrayDeque<>();
this.logQueue = new LinkedBlockingDeque<>(MAX_LOG_SIZE);
}

public void writeLog() {
lock.lock();
try {
// queue is empty, wait for new log
while (logQueue.isEmpty()) {
try {
emptyCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String logMessage = logQueue.poll();
writeToDisk(logMessage);
log.debug("Consumption visit log message: [{}]", logMessage);
// signal log producer
fullCondition.signal();
} finally {
lock.unlock();
public void writeLog() throws InterruptedException {
if (logQueue.isEmpty()) {
return;
}
String logMessage = logQueue.poll();
writeToDisk(logMessage);
log.debug("Consumption visit log message: [{}]", logMessage);
}

void writeToDisk(String logMsg) {
void writeToDisk(String logMsg) throws InterruptedException {
String format = String.format("%s %s\n", Instant.now(), logMsg);
lock.lock();
try {
writer.write(format.getBytes(), 0, format.length());
int size = logBatch.incrementAndGet();
Expand All @@ -131,33 +143,18 @@ void writeToDisk(String logMsg) {
}
} catch (IOException e) {
log.warn("Record access log failure: ", ExceptionUtils.getRootCause(e));
} finally {
lock.unlock();
}
}

public void put(String logMessage) {
lock.lock();
try {
while (logQueue.size() == MAX_LOG_SIZE) {
try {
log.debug("Queue full, producer thread waiting...");
fullCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// add log message to queue tail
logQueue.add(logMessage);
log.info("Production a log messages [{}]", logMessage);
// signal consumer thread
emptyCondition.signal();
} finally {
lock.unlock();
}
public void put(String logMessage) throws InterruptedException {
// add log message to queue tail
logQueue.add(logMessage);
log.info("Production a log messages [{}]", logMessage);
}

public void close() {
@Override
public void dispose() {
this.disposed = true;
if (writer != null) {
try {
writer.flush();
Expand All @@ -167,5 +164,10 @@ public void close() {
FileUtils.closeQuietly(writer);
}
}

@Override
public boolean isDisposed() {
return this.disposed;
}
}
}
53 changes: 53 additions & 0 deletions src/test/java/run/halo/app/metrics/VisitLogWriterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package run.halo.app.metrics;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.util.FileSystemUtils;
import run.halo.app.infra.properties.HaloProperties;

/**
* Tests for {@link VisitLogWriter}.
*
* @author guqing
* @since 2.0.0
*/
@ExtendWith(MockitoExtension.class)
class VisitLogWriterTest {

@Mock
private HaloProperties haloProperties;

private VisitLogWriter visitLogWriter;

private Path workDir;

@BeforeEach
void setUp() throws IOException {
workDir = Files.createTempDirectory("halo-visitlog");
when(haloProperties.getWorkDir()).thenReturn(workDir);
visitLogWriter = new VisitLogWriter(haloProperties);
}

@AfterEach
void tearDown() throws Exception {
visitLogWriter.destroy();
FileSystemUtils.deleteRecursively(workDir);
}

@Test
void start() {
assertThat(visitLogWriter.isStarted()).isFalse();
visitLogWriter.start();
assertThat(visitLogWriter.isStarted()).isTrue();
}
}

0 comments on commit 9b4ed96

Please sign in to comment.