Skip to content

Commit

Permalink
[Feature]add metrics delete on write (DataLinkDC#3643)
Browse files Browse the repository at this point in the history
Co-authored-by: gaoyan1998 <[email protected]>
  • Loading branch information
gaoyan1998 and gaoyan1998 authored Jul 12, 2024
1 parent 91f8708 commit 778d482
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class MetricsContextHolder {
protected static final MetricsContextHolder instance = new MetricsContextHolder();

private final List<MetricsVO> metricsVOS = new CopyOnWriteArrayList<>();
private final AtomicLong lastDumpTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong lastDumpTime = new AtomicLong(0);

static {
String sql = String.format(
Expand Down
19 changes: 0 additions & 19 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.dinky.function.pool.UdfCodePool;
import org.dinky.job.ClearJobHistoryTask;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.SystemMetricsTask;
import org.dinky.resource.BaseResourceManager;
import org.dinky.scheduler.client.ProjectClient;
import org.dinky.scheduler.exception.SchedulerException;
Expand All @@ -54,7 +53,6 @@

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.springframework.boot.ApplicationArguments;
Expand Down Expand Up @@ -146,23 +144,6 @@ private void initResources() {
* init task monitor
*/
private void initDaemon() {
SystemConfiguration sysConfig = SystemConfiguration.getInstances();

// Init system metrics task
DaemonTask sysMetricsTask = DaemonTask.build(new DaemonTaskConfig(SystemMetricsTask.TYPE));
Configuration<Boolean> metricsSysEnable = sysConfig.getMetricsSysEnable();
Configuration<Integer> sysGatherTiming = sysConfig.getMetricsSysGatherTiming();
Consumer<Configuration<?>> metricsListener = c -> {
c.addChangeEvent(x -> {
schedule.removeSchedule(sysMetricsTask.getType());
PeriodicTrigger trigger = new PeriodicTrigger(sysGatherTiming.getValue());
if (metricsSysEnable.getValue()) schedule.addSchedule(sysMetricsTask, trigger);
});
};
metricsListener.accept(metricsSysEnable);
metricsListener.accept(sysGatherTiming);
metricsSysEnable.runChangeEvent();

// Init clear job history task
DaemonTask clearJobHistoryTask = DaemonTask.build(new DaemonTaskConfig(ClearJobHistoryTask.TYPE));
schedule.addSchedule(clearJobHistoryTask, new PeriodicTrigger(1, TimeUnit.HOURS));
Expand Down
27 changes: 22 additions & 5 deletions dinky-admin/src/main/java/org/dinky/utils/SqliteUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -34,16 +35,18 @@ public enum SqliteUtil {
INSTANCE;

private Connection connection;
private final AtomicLong lastRecyle = new AtomicLong(0);

static {
try {
SqliteUtil.INSTANCE.connect("dinky.db");
SqliteUtil.INSTANCE.recyleData();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public void connect(String dbPath) throws SQLException {
private void connect(String dbPath) throws SQLException {
connection = DriverManager.getConnection("jdbc:sqlite:" + dbPath);
}

Expand All @@ -57,11 +60,24 @@ public void createTable(String tableName, String columns) {
}
}

public void write(String tableName, String columns, String values) throws SQLException {
String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, columns, values);
public void executeSql(String sql) throws SQLException {
Statement pstmt = connection.createStatement();
pstmt.executeUpdate(sql);
connection.commit();
}

try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
pstmt.executeUpdate();
public void recyleData() {
long now = System.currentTimeMillis();
if (now - lastRecyle.get() < 1000 * 60 * 60) {
return;
}
lastRecyle.set(now);
try {
String sql = "DELETE FROM dinky_metrics WHERE heart_time <= datetime('now', '-7 days')";
executeSql(sql);
executeSql("VACUUM");
} catch (SQLException e) {
log.error("Failed to recyle database: " + e.getMessage());
}
}

Expand All @@ -81,6 +97,7 @@ public void write(String tableName, List<String> columns, List<List<String>> val
} catch (SQLException e) {
log.error("Failed to write to SQLite: " + e.getMessage());
}
recyleData();
}

private static String createInsertSql(String tableName, List<String> columns) {
Expand Down

0 comments on commit 778d482

Please sign in to comment.