Skip to content

Commit

Permalink
perf(ilp): fix ingress perf issue for small, but frequent transactions (
Browse files Browse the repository at this point in the history
  • Loading branch information
bluestreak01 authored Feb 22, 2024
1 parent 8c747f5 commit eaec412
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 47 deletions.
50 changes: 29 additions & 21 deletions core/src/main/java/io/questdb/PropServerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private final int systemO3ColumnMemorySize;
private final String systemTableNamePrefix;
private final long systemWalWriterDataAppendPageSize;
private final long systemWalWriterEventAppendPageSize;
private final long systemWriterDataAppendPageSize;
private final boolean tableTypeConversionEnabled;
private final TelemetryConfiguration telemetryConfiguration = new PropTelemetryConfiguration();
Expand Down Expand Up @@ -361,6 +362,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private final boolean walSupported;
private final int walTxnNotificationQueueCapacity;
private final long walWriterDataAppendPageSize;
private final long walWriterEventAppendPageSize;
private final int walWriterPoolMaxSegments;
private final long workStealTimeoutNanos;
private final long writerAsyncCommandBusyWaitTimeout;
Expand Down Expand Up @@ -576,7 +578,9 @@ public PropServerConfiguration(
throw CairoException.critical(0).put("cairo.wal.segment.rollover.size must be 0 (disabled) or >= 1024 (1KiB)");
}
this.walWriterDataAppendPageSize = Files.ceilPageSize(getLongSize(properties, env, PropertyKey.CAIRO_WAL_WRITER_DATA_APPEND_PAGE_SIZE, Numbers.SIZE_1MB));
this.walWriterEventAppendPageSize = Files.ceilPageSize(getLongSize(properties, env, PropertyKey.CAIRO_WAL_WRITER_EVENT_APPEND_PAGE_SIZE, 128 * 1024));
this.systemWalWriterDataAppendPageSize = Files.ceilPageSize(getLongSize(properties, env, PropertyKey.CAIRO_SYSTEM_WAL_WRITER_DATA_APPEND_PAGE_SIZE, 256 * 1024));
this.systemWalWriterEventAppendPageSize = Files.ceilPageSize(getLongSize(properties, env, PropertyKey.CAIRO_SYSTEM_WAL_WRITER_EVENT_APPEND_PAGE_SIZE, 16 * 1024));
this.walSquashUncommittedRowsMultiplier = getDouble(properties, env, PropertyKey.CAIRO_WAL_SQUASH_UNCOMMITTED_ROWS_MULTIPLIER, "20.0");
this.walMaxLagTxnCount = getInt(properties, env, PropertyKey.CAIRO_WAL_MAX_LAG_TXN_COUNT, -1);
this.walMaxLagSize = getLongSize(properties, env, PropertyKey.CAIRO_WAL_MAX_LAG_SIZE, 75 * Numbers.SIZE_1MB);
Expand Down Expand Up @@ -622,10 +626,21 @@ public PropServerConfiguration(
int cpuAvailable = Runtime.getRuntime().availableProcessors();
int cpuUsed = 0;
int cpuSpare = 0;
if (cpuAvailable > 32) {
cpuSpare = 2;
int cpuIoWorkers = 0;
int cpuWalApplyWorkers = 2;

if (cpuAvailable > 8) {
cpuWalApplyWorkers = 3;
} else if (cpuAvailable > 16) {
cpuWalApplyWorkers = 4;
cpuSpare = 1;
// tested on 4/32/48 core servers
cpuIoWorkers = cpuAvailable / 2;
} else if (cpuAvailable > 32) {
cpuWalApplyWorkers = 4;
cpuSpare = 2;
// tested on 4/32/48 core servers
cpuIoWorkers = cpuAvailable / 2;
}

final FilesFacade ff = cairoConfiguration.getFilesFacade();
Expand Down Expand Up @@ -869,13 +884,7 @@ public PropServerConfiguration(
this.forceRecvFragmentationChunkSize = getInt(properties, env, PropertyKey.DEBUG_FORCE_RECV_FRAGMENTATION_CHUNK_SIZE, Integer.MAX_VALUE);
}

int walApplyWorkers = 2;
if (cpuAvailable > 16) {
walApplyWorkers = 4;
} else if (cpuAvailable > 8) {
walApplyWorkers = 3;
}
this.walApplyWorkerCount = getInt(properties, env, PropertyKey.WAL_APPLY_WORKER_COUNT, walApplyWorkers);
this.walApplyWorkerCount = getInt(properties, env, PropertyKey.WAL_APPLY_WORKER_COUNT, cpuWalApplyWorkers);
this.walApplyWorkerAffinity = getAffinity(properties, env, PropertyKey.WAL_APPLY_WORKER_AFFINITY, walApplyWorkerCount);
this.walApplyWorkerHaltOnError = getBoolean(properties, env, PropertyKey.WAL_APPLY_WORKER_HALT_ON_ERROR, false);
this.walApplyWorkerSleepThreshold = getLong(properties, env, PropertyKey.WAL_APPLY_WORKER_SLEEP_THRESHOLD, 10_000);
Expand Down Expand Up @@ -1155,17 +1164,7 @@ public PropServerConfiguration(
this.lineTcpWriterWorkerYieldThreshold = getLong(properties, env, PropertyKey.LINE_TCP_WRITER_WORKER_YIELD_THRESHOLD, 10);
this.lineTcpWriterWorkerSleepThreshold = getLong(properties, env, PropertyKey.LINE_TCP_WRITER_WORKER_SLEEP_THRESHOLD, 10_000);
this.symbolCacheWaitUsBeforeReload = getLong(properties, env, PropertyKey.LINE_TCP_SYMBOL_CACHE_WAIT_US_BEFORE_RELOAD, 500_000);

int ilpTcpWorkerCount;
if (cpuAvailable < 9) {
ilpTcpWorkerCount = 0;
} else if (cpuAvailable < 17) {
ilpTcpWorkerCount = 2;
} else {
ilpTcpWorkerCount = 6;
}
this.lineTcpIOWorkerCount = getInt(properties, env, PropertyKey.LINE_TCP_IO_WORKER_COUNT, ilpTcpWorkerCount);
cpuUsed += this.lineTcpIOWorkerCount;
this.lineTcpIOWorkerCount = getInt(properties, env, PropertyKey.LINE_TCP_IO_WORKER_COUNT, cpuIoWorkers);
this.lineTcpIOWorkerAffinity = getAffinity(properties, env, PropertyKey.LINE_TCP_IO_WORKER_AFFINITY, lineTcpIOWorkerCount);
this.lineTcpIOWorkerPoolHaltOnError = getBoolean(properties, env, PropertyKey.LINE_TCP_IO_HALT_ON_ERROR, false);
this.lineTcpIOWorkerYieldThreshold = getLong(properties, env, PropertyKey.LINE_TCP_IO_WORKER_YIELD_THRESHOLD, 10);
Expand Down Expand Up @@ -1220,7 +1219,6 @@ public PropServerConfiguration(

this.ilpAutoCreateNewColumns = getBoolean(properties, env, PropertyKey.LINE_AUTO_CREATE_NEW_COLUMNS, true);
this.ilpAutoCreateNewTables = getBoolean(properties, env, PropertyKey.LINE_AUTO_CREATE_NEW_TABLES, true);

this.sharedWorkerCount = getInt(properties, env, PropertyKey.SHARED_WORKER_COUNT, Math.max(4, cpuAvailable - cpuSpare - cpuUsed));
this.sharedWorkerAffinity = getAffinity(properties, env, PropertyKey.SHARED_WORKER_AFFINITY, sharedWorkerCount);
this.sharedWorkerHaltOnError = getBoolean(properties, env, PropertyKey.SHARED_WORKER_HALT_ON_ERROR, false);
Expand Down Expand Up @@ -2650,6 +2648,11 @@ public long getSystemWalDataAppendPageSize() {
return systemWalWriterDataAppendPageSize;
}

@Override
public long getSystemWalEventAppendPageSize() {
return systemWalWriterEventAppendPageSize;
}

@Override
public long getTableRegistryAutoReloadFrequency() {
return cairoTableRegistryAutoReloadFrequency;
Expand Down Expand Up @@ -2709,6 +2712,11 @@ public boolean getWalEnabledDefault() {
return walEnabledDefault;
}

@Override
public long getWalEventAppendPageSize() {
return walWriterEventAppendPageSize;
}

@Override
public long getWalMaxLagSize() {
return walMaxLagSize;
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/io/questdb/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ public enum PropertyKey implements ConfigPropertyKey {
CAIRO_WAL_SEGMENT_ROLLOVER_ROW_COUNT("cairo.wal.segment.rollover.row.count"),
CAIRO_WAL_SEGMENT_ROLLOVER_SIZE("cairo.wal.segment.rollover.size"),
CAIRO_WAL_WRITER_DATA_APPEND_PAGE_SIZE("cairo.wal.writer.data.append.page.size"),
CAIRO_WAL_WRITER_EVENT_APPEND_PAGE_SIZE("cairo.wal.writer.event.append.page.size"),
CAIRO_SYSTEM_WAL_WRITER_DATA_APPEND_PAGE_SIZE("cairo.system.wal.writer.data.append.page.size"),
CAIRO_SYSTEM_WAL_WRITER_EVENT_APPEND_PAGE_SIZE("cairo.system.wal.writer.event.append.page.size"),
WAL_APPLY_WORKER_COUNT("wal.apply.worker.count"),
WAL_APPLY_WORKER_AFFINITY("wal.apply.worker.affinity"),
WAL_APPLY_WORKER_HALT_ON_ERROR("wal.apply.worker.haltOnError"),
Expand Down Expand Up @@ -490,13 +492,13 @@ public String getPropertyPath() {
}

@Override
public boolean isSensitive() {
return sensitive;
public boolean isDebug() {
return debug;
}

@Override
public boolean isDebug() {
return debug;
public boolean isSensitive() {
return sensitive;
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/questdb/cairo/CairoConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ default RostiAllocFacade getRostiAllocFacade() {

long getSystemWalDataAppendPageSize();

long getSystemWalEventAppendPageSize();

long getTableRegistryAutoReloadFrequency();

int getTableRegistryCompactionThreshold();
Expand All @@ -494,6 +496,8 @@ default RostiAllocFacade getRostiAllocFacade() {

long getWalDataAppendPageSize();

long getWalEventAppendPageSize();

boolean getWalEnabledDefault();

long getWalMaxLagSize();
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/questdb/cairo/CairoConfigurationWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,11 @@ public long getSystemWalDataAppendPageSize() {
return getDelegate().getSystemWalDataAppendPageSize();
}

@Override
public long getSystemWalEventAppendPageSize() {
return getDelegate().getSystemWalEventAppendPageSize();
}

@Override
public long getTableRegistryAutoReloadFrequency() {
return getDelegate().getTableRegistryAutoReloadFrequency();
Expand Down Expand Up @@ -895,6 +900,11 @@ public boolean getWalEnabledDefault() {
return getDelegate().getWalEnabledDefault();
}

@Override
public long getWalEventAppendPageSize() {
return getDelegate().getWalEventAppendPageSize();
}

@Override
public long getWalMaxLagSize() {
return getDelegate().getWalMaxLagSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,12 @@ public int getSystemO3ColumnMemorySize() {

@Override
public long getSystemWalDataAppendPageSize() {
return 256 * 1024;
return 16 * 1024;
}

@Override
public long getSystemWalEventAppendPageSize() {
return 128 * 1024;
}

@Override
Expand Down Expand Up @@ -896,6 +901,11 @@ public boolean getWalEnabledDefault() {
return false;
}

@Override
public long getWalEventAppendPageSize() {
return 64 * 1024;
}

@Override
public long getWalMaxLagSize() {
return 75 * Numbers.SIZE_1MB;
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/java/io/questdb/cairo/wal/WalWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public class WalWriter implements TableWriterAPI {
private final AlterOperation alterOp = new AlterOperation();
private final ObjList<MemoryMA> columns;
private final CairoConfiguration configuration;
private final long dataAppendPageSize;
private final DdlListener ddlListener;
private final WalWriterEvents events;
private final FilesFacade ff;
Expand Down Expand Up @@ -133,11 +132,6 @@ public WalWriter(
this.metrics = metrics;
this.open = true;
this.symbolMapMem = Vm.getMARInstance(configuration.getCommitMode());
if (tableToken.isSystem()) {
this.dataAppendPageSize = configuration.getSystemWalDataAppendPageSize();
} else {
this.dataAppendPageSize = configuration.getWalDataAppendPageSize();
}

try {
lockWal();
Expand All @@ -155,7 +149,7 @@ public WalWriter(
initialSymbolCounts = new AtomicIntList(columnCount);
localSymbolIds = new IntList(columnCount);

events = new WalWriterEvents(ff);
events = new WalWriterEvents(configuration);
events.of(symbolMaps, initialSymbolCounts, symbolMapNullFlags);

configureColumns();
Expand Down Expand Up @@ -1172,14 +1166,18 @@ private void mkWalDir() {
path.trimTo(walDirLength);
}

private long getDataAppendPageSize() {
return tableToken.isSystem() ? configuration.getSystemWalDataAppendPageSize() : configuration.getWalDataAppendPageSize();
}

private void openColumnFiles(CharSequence name, int columnIndex, int pathTrimToLen) {
try {
final MemoryMA mem1 = getPrimaryColumn(columnIndex);
mem1.close(isTruncateFilesOnClose(), Vm.TRUNCATE_TO_POINTER);
mem1.of(
ff,
dFile(path.trimTo(pathTrimToLen), name),
dataAppendPageSize,
getDataAppendPageSize(),
-1,
MemoryTag.MMAP_TABLE_WRITER,
configuration.getWriterFileOpenOpts(),
Expand All @@ -1192,7 +1190,7 @@ private void openColumnFiles(CharSequence name, int columnIndex, int pathTrimToL
mem2.of(
ff,
iFile(path.trimTo(pathTrimToLen), name),
dataAppendPageSize,
getDataAppendPageSize(),
-1,
MemoryTag.MMAP_TABLE_WRITER,
configuration.getWriterFileOpenOpts(),
Expand Down Expand Up @@ -1245,7 +1243,7 @@ private void openNewSegment() {

segmentRowCount = 0;
metadata.switchTo(path, segmentPathLen, isTruncateFilesOnClose());
events.openEventFile(path, segmentPathLen, isTruncateFilesOnClose());
events.openEventFile(path, segmentPathLen, isTruncateFilesOnClose(), tableToken.isSystem());
if (commitMode != CommitMode.NOSYNC) {
events.sync();
}
Expand Down Expand Up @@ -1394,7 +1392,7 @@ private void rollLastWalEventRecord(int newSegmentId, long uncommittedRows) {
events.rollback();
}
path.trimTo(rootLen).slash().put(newSegmentId);
events.openEventFile(path, path.size(), isTruncateFilesOnClose());
events.openEventFile(path, path.size(), isTruncateFilesOnClose(), tableToken.isSystem());
if (isCommittingData) {
// When current transaction is not a data transaction but a column add transaction
// there is no need to add a record about it to the new segment event file.
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/java/io/questdb/cairo/wal/WalWriterEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.io.Closeable;

import static io.questdb.cairo.TableUtils.openSmallFile;
import static io.questdb.cairo.wal.WalUtils.*;

class WalWriterEvents implements Closeable {
Expand All @@ -51,9 +50,11 @@ class WalWriterEvents implements Closeable {
private BoolList symbolMapNullFlags;
private int txn = 0;
private ObjList<CharSequenceIntHashMap> txnSymbolMaps;
private final CairoConfiguration configuration;

WalWriterEvents(FilesFacade ff) {
this.ff = ff;
WalWriterEvents(CairoConfiguration configuration) {
this.configuration = configuration;
this.ff = configuration.getFilesFacade();
}

@Override
Expand Down Expand Up @@ -232,6 +233,7 @@ int appendData(long startRowID, long endRowID, long minTimestamp, long maxTimest
eventMem.putInt(-1);

appendIndex(eventMem.getAppendOffset() - Integer.BYTES);

eventMem.putInt(WALE_MAX_TXN_OFFSET_32, txn);
return txn++;
}
Expand Down Expand Up @@ -262,11 +264,19 @@ void of(ObjList<CharSequenceIntHashMap> txnSymbolMaps, AtomicIntList initialSymb
this.symbolMapNullFlags = symbolMapNullFlags;
}

void openEventFile(Path path, int pathLen, boolean truncate) {
void openEventFile(Path path, int pathLen, boolean truncate, boolean systemTable) {
if (eventMem.getFd() > -1) {
close(truncate, Vm.TRUNCATE_TO_POINTER);
}
openSmallFile(ff, path, pathLen, eventMem, EVENT_FILE_NAME, MemoryTag.MMAP_TABLE_WAL_WRITER);
eventMem.of(
ff,
path.trimTo(pathLen).concat(EVENT_FILE_NAME).$(),
systemTable ? configuration.getSystemWalEventAppendPageSize() : configuration.getWalEventAppendPageSize(),
-1,
MemoryTag.MMAP_TABLE_WAL_WRITER,
CairoConfiguration.O_NONE,
Files.POSIX_MADV_RANDOM
);
indexFd = ff.openRW(path.trimTo(pathLen).concat(EVENT_INDEX_FILE_NAME).$(), CairoConfiguration.O_NONE);
longBuffer = Unsafe.malloc(Long.BYTES, MemoryTag.MMAP_TABLE_WAL_WRITER);
init();
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/java/io/questdb/std/Utf8StringIntHashMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ private void erase(int index) {
values[index] = noEntryValue;
}

private int hash(CharSequence k) {
return Chars.hashCode(k) & mask;
}

private void move(int from, int to) {
keys[to] = keys[from];
hashCodes[to] = hashCodes[from];
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/io/questdb/site/conf/server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ query.timeout.sec=60
# mmap sliding page size that TableWriter uses to append data for each column specifically for System tables
#cairo.system.writer.data.append.page.size=256k

# mmap sliding page size that TableWriter uses to append events for each column specifically for System tables
#cairo.system.writer.even.append.page.size=16k

# Maximum wait timeout in milliseconds for ALTER TABLE SQL statement run via REST and PG Wire interfaces when statement execution is ASYNCHRONOUS
#cairo.writer.alter.busy.wait.timeout=500

Expand Down Expand Up @@ -711,6 +714,12 @@ query.timeout.sec=60
# mmap sliding page size that WalWriter uses to append data for each column
#cairo.wal.writer.data.append.page.size=1M

# mmap sliding page size that WalWriter uses to append events for each column
# this page size has performance impact on large number of small transactions, larger
# page will cope better. However, if the workload is that of small number of large transaction,
# this page size can be reduced. The optimal value should be established via ingestion benchmark.
#cairo.wal.writer.data.append.page.size=128k

# Multiplier to cairo.max.uncommitted.rows to calculate the limit of rows that can kept invisible when writing
# to WAL table under heavy load, when multiple transactions are to be applied.
# It is used to reduce the number Out Of Order commits when Out Of Order commits are unavoidable by squashing multiple commits together.
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/java/io/questdb/test/ServerMainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,10 @@ public void testShowParameters() throws Exception {
"wal.apply.worker.sleep.threshold\tQDB_WAL_APPLY_WORKER_SLEEP_THRESHOLD\t10000\tdefault\tfalse\tfalse\n" +
"wal.apply.worker.sleep.timeout\tQDB_WAL_APPLY_WORKER_SLEEP_TIMEOUT\t10\tdefault\tfalse\tfalse\n" +
"wal.apply.worker.yield.threshold\tQDB_WAL_APPLY_WORKER_YIELD_THRESHOLD\t10\tdefault\tfalse\tfalse\n" +
"cairo.sql.max.recompile.attempts\tQDB_CAIRO_SQL_MAX_RECOMPILE_ATTEMPTS\t10\tdefault\tfalse\tfalse")
"cairo.sql.max.recompile.attempts\tQDB_CAIRO_SQL_MAX_RECOMPILE_ATTEMPTS\t10\tdefault\tfalse\tfalse\n" +
"cairo.system.wal.writer.event.append.page.size\tQDB_CAIRO_SYSTEM_WAL_WRITER_EVENT_APPEND_PAGE_SIZE\t16384\tdefault\tfalse\tfalse\n" +
"cairo.wal.writer.event.append.page.size\tQDB_CAIRO_WAL_WRITER_EVENT_APPEND_PAGE_SIZE\t131072\tdefault\tfalse\tfalse\n"
)
.split("\n");

final Set<String> missingProps = new HashSet<>();
Expand Down

0 comments on commit eaec412

Please sign in to comment.