Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
DanEngelbrecht committed Feb 8, 2024
1 parent f56252c commit 15e9e25
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 111 deletions.
22 changes: 21 additions & 1 deletion lib/blockstorestorage/longtail_blockstorestorage.c
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,25 @@ static int BlockStoreStorageAPI_OpenWriteFile(
return ENOTSUP;
}

static int BlockStoreStorageAPI_OpenAppendFile(
struct Longtail_StorageAPI* storage_api,
const char* path,
Longtail_StorageAPI_HOpenFile* out_open_file)
{
MAKE_LOG_CONTEXT_FIELDS(ctx)
LONGTAIL_LOGFIELD(storage_api, "%p"),
LONGTAIL_LOGFIELD(path, "%s"),
LONGTAIL_LOGFIELD(out_open_file, "%p")
MAKE_LOG_CONTEXT_WITH_FIELDS(ctx, 0, LONGTAIL_LOG_LEVEL_OFF)

LONGTAIL_VALIDATE_INPUT(ctx, storage_api != 0, return 0)
LONGTAIL_VALIDATE_INPUT(ctx, path != 0, return 0)
LONGTAIL_VALIDATE_INPUT(ctx, out_open_file != 0, return 0)

LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "Unsupported, failed with %d", ENOTSUP)
return ENOTSUP;
}

static int BlockStoreStorageAPI_Write(
struct Longtail_StorageAPI* storage_api,
Longtail_StorageAPI_HOpenFile f,
Expand Down Expand Up @@ -1407,7 +1426,8 @@ static int BlockStoreStorageAPI_Init(
BlockStoreStorageAPI_UnlockFile,
BlockStoreStorageAPI_GetParentPath,
BlockStoreStorageAPI_MapFile,
BlockStoreStorageAPI_UnmapFile);
BlockStoreStorageAPI_UnmapFile,
BlockStoreStorageAPI_OpenAppendFile);

struct BlockStoreStorageAPI* block_store_fs = (struct BlockStoreStorageAPI*)api;

Expand Down
216 changes: 119 additions & 97 deletions lib/concurrentchunkwrite/longtail_concurrentchunkwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ static uint64_t ConcurrentChunkWriteAPI_GetPathHash(const char* path)

struct OpenFileEntry
{
uint32_t m_TotalWriteCount;
TLongtail_Atomic64 m_PendingWriteCount;
Longtail_StorageAPI_HOpenFile m_FileHandle;
const char* m_FullPath;
TLongtail_Atomic32 m_OpenCount;
};

struct PathLookup
Expand All @@ -162,8 +162,7 @@ struct ConcurrentChunkWriteAPI
struct Longtail_StorageAPI* m_StorageAPI;
HLongtail_RWLock m_RWLock;
struct PathLookup* m_PathHashToOpenFile;
struct HandleLookup* m_FileHandleToOpenFile;
struct OpenFileEntry* m_OpenFileEntries;
struct OpenFileEntry** m_OpenFileEntries;
char* m_BasePath;
};

Expand Down Expand Up @@ -199,79 +198,98 @@ static int ConcurrentChunkWriteAPI_Open(
if (i != -1)
{
intptr_t open_file_index = api->m_PathHashToOpenFile[i].value;
const struct OpenFileEntry* open_file_entry = &api->m_OpenFileEntries[open_file_index];
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_TotalWriteCount == chunk_write_count, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_PendingWriteCount > 0, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle != 0, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)open_file_entry->m_FileHandle;
Longtail_UnlockRWLockRead(api->m_RWLock);
return 0;
struct OpenFileEntry* open_file_entry = api->m_OpenFileEntries[open_file_index];
if (open_file_entry->m_FileHandle != 0)
{
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle != 0, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)open_file_entry;
Longtail_AtomicAdd32(&open_file_entry->m_OpenCount, 1);
Longtail_UnlockRWLockRead(api->m_RWLock);
return 0;
}
}
Longtail_UnlockRWLockRead(api->m_RWLock);
}

char* full_asset_path = api->m_StorageAPI->ConcatPath(api->m_StorageAPI, api->m_BasePath, path);
if (full_asset_path == 0)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "ConcatPath() failed with %d", ENOMEM)
return ENOMEM;
}

int err = EnsureParentPathExists(api->m_StorageAPI, full_asset_path);
if (err != 0)
Longtail_LockRWLockWrite(api->m_RWLock);
ptrdiff_t tmp;
intptr_t i = api->m_PathHashToOpenFile ? hmgeti_ts(api->m_PathHashToOpenFile, path_hash, tmp) : -1;
if (i == -1)
{
Longtail_Free(full_asset_path);
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "EnsureParentPathExists() failed with %d", err)
return err;
}
Longtail_UnlockRWLockWrite(api->m_RWLock);

{
Longtail_LockRWLockWrite(api->m_RWLock);
ptrdiff_t tmp;
intptr_t i = api->m_PathHashToOpenFile ? hmgeti_ts(api->m_PathHashToOpenFile, path_hash, tmp) : -1;
if (i != -1)
char* full_asset_path = api->m_StorageAPI->ConcatPath(api->m_StorageAPI, api->m_BasePath, path);
if (full_asset_path == 0)
{
intptr_t open_file_index = api->m_PathHashToOpenFile[i].value;
const struct OpenFileEntry* open_file_entry = &api->m_OpenFileEntries[open_file_index];
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_TotalWriteCount == chunk_write_count, Longtail_UnlockRWLockWrite(api->m_RWLock); Longtail_Free(full_asset_path); return EINVAL);
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_PendingWriteCount > 0, Longtail_UnlockRWLockWrite(api->m_RWLock); Longtail_Free(full_asset_path); return EINVAL);
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle != 0, Longtail_UnlockRWLockWrite(api->m_RWLock); return EINVAL);
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)open_file_entry->m_FileHandle;
Longtail_UnlockRWLockWrite(api->m_RWLock);
Longtail_Free(full_asset_path);
return 0;
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "ConcatPath() failed with %d", ENOMEM)
return ENOMEM;
}

Longtail_StorageAPI_HOpenFile r;
err = api->m_StorageAPI->OpenWriteFile(api->m_StorageAPI, full_asset_path, 0, &r);
int err = EnsureParentPathExists(api->m_StorageAPI, full_asset_path);
if (err != 0)
{
Longtail_UnlockRWLockWrite(api->m_RWLock);
Longtail_Free(full_asset_path);
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "OpenWriteFile() failed with %d", err)
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "EnsureParentPathExists() failed with %d", err)
return err;
}
if (chunk_write_count == 0)

Longtail_LockRWLockWrite(api->m_RWLock);
i = api->m_PathHashToOpenFile ? hmgeti_ts(api->m_PathHashToOpenFile, path_hash, tmp) : -1;
if (i == -1)
{
struct OpenFileEntry* open_file_entry = (struct OpenFileEntry*)Longtail_Alloc("ConcurrentChunkWriteAPI_Open", sizeof(struct OpenFileEntry));
if (open_file_entry == 0)
{
err = ENOMEM;
Longtail_UnlockRWLockWrite(api->m_RWLock);
Longtail_Free(full_asset_path);
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "OpenWriteFile() failed with %d", err)
return err;
}
open_file_entry->m_FileHandle = 0;
open_file_entry->m_FullPath = full_asset_path;
open_file_entry->m_OpenCount = 1;
full_asset_path = 0;
ptrdiff_t entry_index = arrlen(api->m_OpenFileEntries);
arrput(api->m_OpenFileEntries, open_file_entry);
hmput(api->m_PathHashToOpenFile, path_hash, entry_index);

int err = api->m_StorageAPI->OpenWriteFile(api->m_StorageAPI, open_file_entry->m_FullPath, 0, &open_file_entry->m_FileHandle);
if (err != 0)
{
Longtail_UnlockRWLockWrite(api->m_RWLock);
Longtail_Free((void*)open_file_entry->m_FullPath);
Longtail_Free((void*)open_file_entry);
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "OpenAppendFile() failed with %d", err)
return err;
}
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)open_file_entry;
Longtail_UnlockRWLockWrite(api->m_RWLock);
// Empty file, close immediately
api->m_StorageAPI->CloseFile(api->m_StorageAPI, r);
Longtail_Free(full_asset_path);
*out_open_file = 0;
return 0;
}
struct OpenFileEntry entry;
entry.m_FileHandle = r;
entry.m_TotalWriteCount = chunk_write_count;
entry.m_PendingWriteCount = (int64_t)chunk_write_count;
ptrdiff_t entry_index = arrlen(api->m_OpenFileEntries);
arrput(api->m_OpenFileEntries, entry);
hmput(api->m_PathHashToOpenFile, path_hash, entry_index);
hmput(api->m_FileHandleToOpenFile, r, entry_index);
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)r;
Longtail_Free(full_asset_path);
}

intptr_t open_file_index = api->m_PathHashToOpenFile[i].value;
struct OpenFileEntry* open_file_entry = api->m_OpenFileEntries[open_file_index];
if (open_file_entry->m_FileHandle != 0)
{
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle != 0, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)open_file_entry;
Longtail_AtomicAdd32(&open_file_entry->m_OpenCount, 1);
Longtail_UnlockRWLockWrite(api->m_RWLock);
return 0;
}
Longtail_Free(full_asset_path);
int err = api->m_StorageAPI->OpenAppendFile(api->m_StorageAPI, open_file_entry->m_FullPath, &open_file_entry->m_FileHandle);
if (err != 0)
{
Longtail_UnlockRWLockWrite(api->m_RWLock);
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_INFO, "OpenAppendFile() failed with %d", err)
return err;
}
Longtail_AtomicAdd32(&open_file_entry->m_OpenCount, 1);
*out_open_file = (Longtail_ConcurrentChunkWriteAPI_HOpenFile)open_file_entry;
Longtail_UnlockRWLockWrite(api->m_RWLock);
return 0;
}

Expand Down Expand Up @@ -299,64 +317,66 @@ static int ConcurrentChunkWriteAPI_Write(
#endif // defined(LONGTAIL_ASSERTS)

LONGTAIL_VALIDATE_INPUT(ctx, concurrent_file_write_api != 0, return EINVAL);
LONGTAIL_VALIDATE_INPUT(ctx, (uintptr_t)in_open_file != 0, return EINVAL);
LONGTAIL_VALIDATE_INPUT(ctx, in_open_file != 0, return EINVAL);
LONGTAIL_VALIDATE_INPUT(ctx, size != 0, return EINVAL);
LONGTAIL_VALIDATE_INPUT(ctx, input != 0, return EINVAL);

struct ConcurrentChunkWriteAPI* api = (struct ConcurrentChunkWriteAPI*)concurrent_file_write_api;
Longtail_StorageAPI_HOpenFile file_handle = (Longtail_StorageAPI_HOpenFile)in_open_file;
struct OpenFileEntry* open_file_entry = (struct OpenFileEntry*)in_open_file;
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_OpenCount > 0, return EINVAL)
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle != 0, return EINVAL)
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FullPath != 0, return EINVAL)

int err = api->m_StorageAPI->Write(api->m_StorageAPI, file_handle, offset, size, input);
int err = api->m_StorageAPI->Write(api->m_StorageAPI, open_file_entry->m_FileHandle, offset, size, input);
if (err)
{
{
Longtail_LockRWLockWrite(api->m_RWLock);
ptrdiff_t open_file_index = 0;
struct OpenFileEntry* open_file_entry = &api->m_OpenFileEntries[open_file_index];
open_file_entry->m_FileHandle = 0;
hmdel(api->m_FileHandleToOpenFile, file_handle);
Longtail_UnlockRWLockWrite(api->m_RWLock);
}
api->m_StorageAPI->CloseFile(api->m_StorageAPI, file_handle);
return err;
}
return 0;
}

static int ConcurrentChunkWriteAPI_Close(
struct Longtail_ConcurrentChunkWriteAPI* concurrent_file_write_api,
Longtail_ConcurrentChunkWriteAPI_HOpenFile in_open_file)
{
#if defined(LONGTAIL_ASSERTS)
MAKE_LOG_CONTEXT_FIELDS(ctx)
LONGTAIL_LOGFIELD(concurrent_file_write_api, "%p"),
MAKE_LOG_CONTEXT_WITH_FIELDS(ctx, 0, LONGTAIL_LOG_LEVEL_DEBUG)
#else
struct Longtail_LogContextFmt_Private* ctx = 0;
#endif // defined(LONGTAIL_ASSERTS)

LONGTAIL_VALIDATE_INPUT(ctx, concurrent_file_write_api != 0, return EINVAL);
LONGTAIL_VALIDATE_INPUT(ctx, in_open_file != 0, return EINVAL);

struct ConcurrentChunkWriteAPI* api = (struct ConcurrentChunkWriteAPI*)concurrent_file_write_api;
struct OpenFileEntry* open_file_entry = (struct OpenFileEntry*)in_open_file;

ptrdiff_t open_file_index = 0;
int close_on_write = 0;
{
Longtail_LockRWLockRead(api->m_RWLock);
ptrdiff_t tmp;
intptr_t i = api->m_PathHashToOpenFile ? hmgeti_ts(api->m_FileHandleToOpenFile, file_handle, tmp) : -1;
if (i == -1)
int32_t OpenCount = Longtail_AtomicAdd32(&open_file_entry->m_OpenCount, -1);
if (OpenCount > 0)
{
Longtail_UnlockRWLockRead(api->m_RWLock);
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "ConcurrentChunkWriteAPI_Write() file not open, error %d", EINVAL)
return EINVAL;
return 0;
}
open_file_index = api->m_FileHandleToOpenFile[i].value;
struct OpenFileEntry* open_file_entry = &api->m_OpenFileEntries[open_file_index];
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_PendingWriteCount >= (int64_t)chunk_count, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle == file_handle, Longtail_UnlockRWLockRead(api->m_RWLock); return EINVAL);
int64_t pending_count = Longtail_AtomicAdd64(&open_file_entry->m_PendingWriteCount, -(int64_t)chunk_count);
Longtail_UnlockRWLockRead(api->m_RWLock);
close_on_write = (pending_count == 0);
*out_chunks_remaining = (uint32_t)pending_count;
}

if (close_on_write)
{
Longtail_LockRWLockWrite(api->m_RWLock);
if (open_file_entry->m_OpenCount == 0)
{
Longtail_LockRWLockWrite(api->m_RWLock);
struct OpenFileEntry* open_file_entry = &api->m_OpenFileEntries[open_file_index];
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_PendingWriteCount == 0, Longtail_UnlockRWLockWrite(api->m_RWLock); return EINVAL);
LONGTAIL_FATAL_ASSERT(ctx, open_file_entry->m_FileHandle == file_handle, Longtail_UnlockRWLockWrite(api->m_RWLock); return EINVAL);
open_file_entry->m_FileHandle = 0;
hmdel(api->m_FileHandleToOpenFile, file_handle);
Longtail_UnlockRWLockWrite(api->m_RWLock);
if (open_file_entry->m_FileHandle)
{
api->m_StorageAPI->CloseFile(api->m_StorageAPI, open_file_entry->m_FileHandle);
open_file_entry->m_FileHandle = 0;
}
}
api->m_StorageAPI->CloseFile(api->m_StorageAPI, file_handle);
Longtail_UnlockRWLockWrite(api->m_RWLock);
}
return err;
return 0;
}

static int ConcurrentChunkWriteAPI_Flush(
Expand All @@ -376,14 +396,16 @@ static int ConcurrentChunkWriteAPI_Flush(
ptrdiff_t entry_count = arrlen(api->m_OpenFileEntries);
for (ptrdiff_t i = 0; i < entry_count; ++i)
{
Longtail_StorageAPI_HOpenFile file_handle = api->m_OpenFileEntries[i].m_FileHandle;
struct OpenFileEntry* open_file_entry = api->m_OpenFileEntries[i];
Longtail_StorageAPI_HOpenFile file_handle = open_file_entry->m_FileHandle;
if (file_handle)
{
api->m_StorageAPI->CloseFile(api->m_StorageAPI, file_handle);
}
Longtail_Free((void*)open_file_entry->m_FullPath);
Longtail_Free((void*)open_file_entry);
}
hmfree(api->m_PathHashToOpenFile);
hmfree(api->m_FileHandleToOpenFile);
arrfree(api->m_OpenFileEntries);
Longtail_UnlockRWLockWrite(api->m_RWLock);
return 0;
Expand Down Expand Up @@ -441,7 +463,7 @@ static void ConcurrentChunkWriteAPI_Dispose(struct Longtail_API* concurrent_file
LONGTAIL_LOGFIELD(concurrent_file_write_api, "%p")
MAKE_LOG_CONTEXT_WITH_FIELDS(ctx, 0, LONGTAIL_LOG_LEVEL_DEBUG)

LONGTAIL_FATAL_ASSERT(ctx, concurrent_file_write_api != 0, return);
LONGTAIL_FATAL_ASSERT(ctx, concurrent_file_write_api != 0, return);
struct ConcurrentChunkWriteAPI* api = (struct ConcurrentChunkWriteAPI*)concurrent_file_write_api;

int err = ConcurrentChunkWriteAPI_Flush(&api->m_ConcurrentChunkWriteAPI);
Expand Down Expand Up @@ -474,14 +496,14 @@ static int ConcurrentChunkWriteAPI_Init(
ConcurrentChunkWriteAPI_Dispose,
ConcurrentChunkWriteAPI_CreateDir,
ConcurrentChunkWriteAPI_Open,
ConcurrentChunkWriteAPI_Close,
ConcurrentChunkWriteAPI_Write,
ConcurrentChunkWriteAPI_Flush);

struct ConcurrentChunkWriteAPI* storage_api = (struct ConcurrentChunkWriteAPI*)api;
storage_api->m_StorageAPI = storageAPI;
Longtail_CreateRWLock(&storage_api[1], &storage_api->m_RWLock);
storage_api->m_PathHashToOpenFile = 0;
storage_api->m_FileHandleToOpenFile = 0;
storage_api->m_OpenFileEntries = 0;
storage_api->m_BasePath = Longtail_Strdup(base_path);

Expand Down
Loading

0 comments on commit 15e9e25

Please sign in to comment.