Skip to content

Commit

Permalink
enable tsan
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 27, 2024
1 parent ac08521 commit 4941eff
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 36 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,37 @@ jobs:
- name: Java test
run: make javatest

clang-build-test-with-tsan:
name: clang build and test with tsan
needs: [clang-format, sanity-checks, python-lint-check]
runs-on: kuzu-self-hosted-testing
env:
NUM_THREADS: 32
TEST_JOBS: 16
CC: clang
CXX: clang++
UW_S3_ACCESS_KEY_ID: ${{ secrets.UW_S3_ACCESS_KEY_ID }}
UW_S3_SECRET_ACCESS_KEY: ${{ secrets.UW_S3_SECRET_ACCESS_KEY }}
AWS_S3_ACCESS_KEY_ID: ${{ secrets.AWS_S3_ACCESS_KEY_ID }}
AWS_S3_SECRET_ACCESS_KEY: ${{ secrets.AWS_S3_SECRET_ACCESS_KEY }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_S3_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_S3_SECRET_ACCESS_KEY }}
RUN_ID: "$(hostname)-$(date +%s)"
steps:
- uses: actions/checkout@v3

- name: Ensure Python dependencies
run: |
pip install torch~=2.0.0 --extra-index-url https://download.pytorch.org/whl/cpu
pip install --user -r tools/python_api/requirements_dev.txt -f https://data.pyg.org/whl/torch-2.0.0+cpu.html
- name: Ensure Node.js dependencies
run: npm install --include=dev
working-directory: tools/nodejs_api

- name: Test
run: make test TSAN=1

msvc-build-test:
name: msvc build & test
needs: [clang-format, sanity-checks, python-lint-check]
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ if(NOT MSVC)
endif()

if(${ENABLE_THREAD_SANITIZER})
add_compile_definitions(KUZU_TSAN)
if(MSVC)
message(FATAL_ERROR "Thread sanitizer is not supported on MSVC")
else()
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct BufferPoolConstants {
// The default max size for a VMRegion.
#ifdef __32BIT__
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 30; // (1GB)
#elif KUZU_TSAN
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 36; // (64GB)
#else
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 43; // (8TB)
#endif
Expand Down
63 changes: 36 additions & 27 deletions src/include/processor/operator/mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace processor {

// Note: Classes in this file are NOT thread-safe.
struct MaskUtil {
static inline common::offset_t getMorselIdx(common::offset_t offset) {
static common::offset_t getMorselIdx(common::offset_t offset) {
return offset >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
};
Expand All @@ -23,8 +23,22 @@ struct MaskData {
std::fill(data, data + size, 0);
}

inline void setMask(uint64_t pos, uint8_t maskValue) const { data[pos] = maskValue; }
inline bool isMasked(uint64_t pos, uint8_t trueMaskVal) const {
#if KUZU_TSAN
#if defined(__has_feature) && __has_feature(thread_sanitizer)
__attribute__((no_sanitize("thread")))
#endif
#endif
void
setMask(uint64_t pos, uint8_t maskValue) const {
data[pos] = maskValue;
}
#if KUZU_TSAN
#if defined(__has_feature) && __has_feature(thread_sanitizer)
__attribute__((no_sanitize("thread")))
#endif
#endif
bool
isMasked(uint64_t pos, uint8_t trueMaskVal) const {
return data[pos] == trueMaskVal;
}

Expand All @@ -37,25 +51,26 @@ class MaskCollection {
public:
MaskCollection() : numMasks{0} {}

inline void init(common::offset_t maxOffset) {
void init(common::offset_t maxOffset) {
std::unique_lock lck{mtx};
if (maskData != nullptr) { // MaskCollection might be initialized repeatedly.
return;
}
maskData = std::make_unique<MaskData>(maxOffset + 1);
}

inline bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); }
bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); }

// Increment mask value for the given nodeOffset if its current mask value is equal to
// the specified `currentMaskValue`.
inline void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) {
void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) {
if (maskData->isMasked(offset, currentMaskValue)) {
maskData->setMask(offset, currentMaskValue + 1);
}
}

inline uint8_t getNumMasks() const { return numMasks; }
inline void incrementNumMasks() { numMasks++; }
uint8_t getNumMasks() const { return numMasks; }
void incrementNumMasks() { numMasks++; }

private:
std::mutex mtx;
Expand All @@ -75,8 +90,8 @@ class NodeSemiMask {
virtual uint8_t getNumMasks() const = 0;
virtual void incrementNumMasks() = 0;

inline bool isEnabled() const { return getNumMasks() > 0; }
inline storage::NodeTable* getNodeTable() const { return nodeTable; }
bool isEnabled() const { return getNumMasks() > 0; }
storage::NodeTable* getNodeTable() const { return nodeTable; }

protected:
storage::NodeTable* nodeTable;
Expand All @@ -88,24 +103,22 @@ class NodeOffsetSemiMask : public NodeSemiMask {
offsetMask = std::make_unique<MaskCollection>();
}

inline void init(transaction::Transaction* trx) override {
void init(transaction::Transaction* trx) override {
auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx);
if (maxNodeOffset == common::INVALID_OFFSET) {
return;
}
offsetMask->init(nodeTable->getMaxNodeOffset(trx) + 1);
}

inline void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override {
void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override {
offsetMask->incrementMaskValue(nodeOffset, currentMaskValue);
}

inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
inline void incrementNumMasks() override { offsetMask->incrementNumMasks(); }
uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
void incrementNumMasks() override { offsetMask->incrementNumMasks(); }

inline bool isNodeMasked(common::offset_t nodeOffset) {
return offsetMask->isMasked(nodeOffset);
}
bool isNodeMasked(common::offset_t nodeOffset) { return offsetMask->isMasked(nodeOffset); }

private:
std::unique_ptr<MaskCollection> offsetMask;
Expand All @@ -118,7 +131,7 @@ class NodeOffsetAndMorselSemiMask : public NodeSemiMask {
morselMask = std::make_unique<MaskCollection>();
}

inline void init(transaction::Transaction* trx) override {
void init(transaction::Transaction* trx) override {
auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx);
if (maxNodeOffset == common::INVALID_OFFSET) {
return;
Expand All @@ -129,23 +142,19 @@ class NodeOffsetAndMorselSemiMask : public NodeSemiMask {

// Note: blindly update mask does not parallelize well, so we minimize write by first checking
// if the mask is set to true (mask value is equal to the expected currentMaskValue) or not.
inline void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override {
void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override {
offsetMask->incrementMaskValue(nodeOffset, currentMaskValue);
morselMask->incrementMaskValue(MaskUtil::getMorselIdx(nodeOffset), currentMaskValue);
}

inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
inline void incrementNumMasks() override {
uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
void incrementNumMasks() override {
offsetMask->incrementNumMasks();
morselMask->incrementNumMasks();
}

inline bool isMorselMasked(common::offset_t morselIdx) {
return morselMask->isMasked(morselIdx);
}
inline bool isNodeMasked(common::offset_t nodeOffset) {
return offsetMask->isMasked(nodeOffset);
}
bool isMorselMasked(common::offset_t morselIdx) { return morselMask->isMasked(morselIdx); }
bool isNodeMasked(common::offset_t nodeOffset) { return offsetMask->isMasked(nodeOffset); }

private:
std::unique_ptr<MaskCollection> offsetMask;
Expand Down
10 changes: 9 additions & 1 deletion src/include/storage/stats/property_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ class PropertyStatistics {
void serialize(common::Serializer& serializer) const;
static std::unique_ptr<PropertyStatistics> deserialize(common::Deserializer& deserializer);

inline void setHasNull() { mayHaveNullValue = true; }
#if KUZU_TSAN
#if defined(__has_feature) && __has_feature(thread_sanitizer)
__attribute__((no_sanitize("thread")))
#endif
#endif
void
setHasNull() {
mayHaveNullValue = true;
}

private:
// Stores whether or not the property is known to have contained a null value
Expand Down
11 changes: 10 additions & 1 deletion src/include/storage/stats/table_statistics_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ class TablesStatistics {

void initTableStatisticsForWriteTrx();

#if KUZU_TSAN
#if defined(__has_feature) && __has_feature(thread_sanitizer)
__attribute__((no_sanitize("thread")))
#endif
#endif
void
setToUpdated() {
isUpdated = true;
}

protected:
virtual std::unique_ptr<TableStatistics> constructTableStatistic(
catalog::TableCatalogEntry* tableEntry) = 0;
Expand All @@ -101,7 +111,6 @@ class TablesStatistics {

void initTableStatisticsForWriteTrxNoLock();

void setToUpdated() { isUpdated = true; }
void resetToNotUpdated() { isUpdated = false; }

protected:
Expand Down
4 changes: 4 additions & 0 deletions src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ bool PhysicalOperator::getNextTuple(ExecutionContext* context) {
}
metrics->executionTime.start();
auto result = getNextTuplesInternal(context);
#if KUZU_TSAN
#if !defined(__has_feature) || !__has_feature(thread_sanitizer)
context->clientContext->getProgressBar()->updateProgress(getProgress(context));
#endif
#endif
metrics->executionTime.stop();
return result;
}
Expand Down
13 changes: 7 additions & 6 deletions src/storage/stats/property_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ void RWPropertyStats::setHasNull(const transaction::Transaction& transaction) {
// TODO(Guodong): INVALID_PROPERTY_ID is used here because we have a column, i.e., nbrIDColumn,
// not exposed as property in table schema, but still have nullColumn. Should be fixed once we
// properly align properties and chunks.
if (propertyID != common::INVALID_PROPERTY_ID) {
KU_ASSERT(tablesStatistics);
auto propStats =
tablesStatistics->getPropertyStatisticsForTable(transaction, tableID, propertyID);
propStats.setHasNull();
tablesStatistics->setPropertyStatisticsForTable(tableID, propertyID, propStats);
if (propertyID == common::INVALID_PROPERTY_ID) {
return;
}
KU_ASSERT(tablesStatistics && transaction.isWriteTransaction());
auto& propStats =
tablesStatistics->getPropertyStatisticsForTable(transaction, tableID, propertyID);
propStats.setHasNull();
tablesStatistics->setToUpdated();
}

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void TablesStatistics::setPropertyStatisticsForTable(table_id_t tableID, propert
KU_ASSERT(readWriteVersion && readWriteVersion->tableStatisticPerTable.contains(tableID));
setToUpdated();
auto tableStatistics = readWriteVersion->tableStatisticPerTable.at(tableID).get();
tableStatistics->setPropertyStatistics(propertyID, stats);
tableStatistics->setPropertyStatistics(propertyID, std::move(stats));
}

std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
Expand Down

0 comments on commit 4941eff

Please sign in to comment.