From 353aaaa1f1d71a1094e95470a76a2d7548dc0a69 Mon Sep 17 00:00:00 2001 From: Stephane Gouache Date: Thu, 11 Jul 2024 18:05:27 +0200 Subject: [PATCH] Fix write operation --- CMakePresets.json | 39 +++--- src/gcsplugin.cpp | 157 +++++++++++++++--------- test/CMakeLists.txt | 2 +- test/drivertest.cpp | 291 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 412 insertions(+), 77 deletions(-) create mode 100644 test/drivertest.cpp diff --git a/CMakePresets.json b/CMakePresets.json index 9ceb982..ccb4aa8 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -15,41 +15,38 @@ "toolchainFile": "${sourceDir}/vcpkg/scripts/buildsystems/vcpkg.cmake" }, { - "name": "ninja-multi", - "displayName": "Ninja without vcpkg", - "description": "Configure Ninja project files for all configurations", - "binaryDir": "${sourceDir}/builds/${presetName}", - "generator": "Ninja Multi-Config" + "name": "ninja-multi-vcpkg-debug", + "displayName": "Ninja Multi-Config with vcpkg (Debug)", + "inherits": "ninja-multi-vcpkg", + "binaryDir": "${sourceDir}/builds/ninja-multi-vcpkg", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug" + } + }, + { + "name": "ninja-multi-vcpkg-release", + "displayName": "Ninja Multi-Config with vcpkg (Release)", + "inherits": "ninja-multi-vcpkg", + "binaryDir": "${sourceDir}/builds/ninja-multi-vcpkg", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Release" + } } ], "buildPresets": [ { "name": "ninja-vcpkg-debug", - "configurePreset": "ninja-multi-vcpkg", + "configurePreset": "ninja-multi-vcpkg-debug", "displayName": "Build (Debug)", "description": "Build with Ninja/vcpkg (Debug)", "configuration": "Debug" }, { "name": "ninja-vcpkg-release", - "configurePreset": "ninja-multi-vcpkg", + "configurePreset": "ninja-multi-vcpkg-release", "displayName": "Build (Release)", "description": "Build with Ninja/vcpkg (Release)", "configuration": "Release" - }, - { - "name": "ninja-debug", - "configurePreset": "ninja-multi", - "displayName": "Build (Debug)", - "description": "Build with Ninja", - "configuration": "Debug" - }, - { - "name": "ninja-release", - "configurePreset": "ninja-multi", - "displayName": "Build (release)", - "description": "Build with Ninja", - "configuration": "Release" } ], "testPresets": [ diff --git a/src/gcsplugin.cpp b/src/gcsplugin.cpp index 3b577b0..b8a67a0 100644 --- a/src/gcsplugin.cpp +++ b/src/gcsplugin.cpp @@ -17,6 +17,7 @@ #include #include +#include constexpr const char* version = "0.1.0"; constexpr const char* driver_name = "GCS driver"; @@ -51,10 +52,17 @@ struct MultiPartFile tOffset total_size_{ 0 }; }; +struct WriteFile +{ + std::string bucketname_; + std::string filename_; + google::cloud::storage::ObjectWriteStream writer_; +}; + enum class HandleType { kRead, kWrite, kAppend }; using ReaderPtr = std::unique_ptr; -using WriterPtr = std::unique_ptr; +using WriterPtr = std::unique_ptr; union ClientVariant { @@ -103,7 +111,7 @@ struct Handle } MultiPartFile& Reader() { return *(var.reader); } - gcs::ObjectWriteStream& Writer() { return *(var.writer); } + WriteFile& Writer() { return *(var.writer); } }; using HandlePtr = std::unique_ptr; @@ -175,24 +183,6 @@ long long int DownloadFileRangeToBuffer(const std::string& bucket_name, return num_read; } -bool UploadBufferToGcs(const std::string& bucket_name, - const std::string& object_name, - const char* buffer, - std::size_t buffer_size) { - - auto writer = client.WriteObject(bucket_name, object_name); - writer.write(buffer, buffer_size); - writer.Close(); - - auto status = writer.metadata(); - if (!status) { - spdlog::error("Error during upload: {} {}", (int)(status.status().code()), status.status().message()); - return false; - } - - return true; -} - bool ParseGcsUri(const std::string& gcs_uri, std::string& bucket_name, std::string& object_name) { char const* prefix = "gs://"; @@ -410,7 +400,12 @@ int driver_fileExists(const char* sFilePathName) auto status_or_metadata_list = client.ListObjects(bucket_name, gcs::MatchGlob{ object_name }); const auto first_item_it = status_or_metadata_list.begin(); - if ((first_item_it == status_or_metadata_list.end()) || !(*first_item_it)) + if (first_item_it == status_or_metadata_list.end()) + { + spdlog::debug("Object does not exist"); + return kFalse; + } + if (!(*first_item_it)) { spdlog::error("Error checking object"); return kFalse; @@ -606,6 +601,7 @@ int AccumulateNamesAndSizes(MultiPartFile& h) } } } + h.commonHeaderLength_ = header_size; h.total_size_ = *(h.cumulativeSize_.rbegin()); @@ -651,7 +647,14 @@ void* driver_fopen(const char* filename, char mode) } case 'w': { - active_handles.push_back(MakeHandlePtrFromWriter({})); + auto writer = client.WriteObject(bucketname, objectname); + if (!writer) { + spdlog::error("Error initializing write stream: {} {}", (int)(writer.metadata().status().code()), writer.metadata().status().message()); + return 0; + } + WriterPtr writer_struct{ new WriteFile }; + writer_struct->writer_ = std::move(writer); + active_handles.push_back(MakeHandlePtrFromWriter(std::move(writer_struct))); break; } case 'a': @@ -675,6 +678,16 @@ int driver_fclose(void* stream) spdlog::debug("fclose {}", (void*)stream); + Handle* stream_h = reinterpret_cast(stream); + if (HandleType::kWrite == stream_h->type) + { + stream_h->var.writer->writer_.Close(); + auto status = stream_h->var.writer->writer_.metadata(); + if (!status) { + spdlog::error("Error during upload: {} {}", (int)(status.status().code()), status.status().message()); + } + } + return EraseRemove(stream); } @@ -896,9 +909,20 @@ long long int driver_fwrite(const void* ptr, size_t size, size_t count, void* st spdlog::debug("fwrite {} {} {} {}", ptr, size, count, stream); assert(stream != NULL); - MultiPartFile* h = (MultiPartFile*)stream; + Handle* stream_h = reinterpret_cast(stream); + if (HandleType::kWrite != stream_h->type) + { + spdlog::error("Cannot write on not writing stream"); + return -1; + } - UploadBufferToGcs(h->bucketname_, h->filename_, (char*)ptr, size * count); + stream_h->var.writer->writer_.write((const char*)ptr, size * count); + if (stream_h->var.writer->writer_.bad()) { + auto status = stream_h->var.writer->writer_.last_status(); + spdlog::error("Error during upload: {} {}", (int)(status.code()), status.message()); + return -1; + } + spdlog::debug("Write status after write: good {}, bad {}, fail {}, goodbit {}", stream_h->var.writer->writer_.good(), stream_h->var.writer->writer_.bad(), stream_h->var.writer->writer_.fail(), (int)(stream_h->var.writer->writer_.goodbit)); // TODO proper error handling... return size * count; @@ -922,10 +946,10 @@ int driver_remove(const char* filename) auto status = client.DeleteObject(bucket_name, object_name); if (!status.ok()) { spdlog::error("Error deleting object: {} {}", (int)(status.code()), status.message()); - return 0; + return kFailure; } - return 1; + return kSuccess; } int driver_rmdir(const char* filename) @@ -964,10 +988,12 @@ int driver_copyToLocal(const char* sSourceFilePathName, const char* sDestFilePat ParseGcsUri(sSourceFilePathName, bucket_name, object_name); FallbackToDefaultBucket(bucket_name); - // Create a ReadObject stream - auto reader = client.ReadObject(bucket_name, object_name); - if (!reader) { - spdlog::error("Error initializing download stream: {} {}", (int)(reader.status().code()), reader.status().message()); + ReaderPtr reader_struct{ new MultiPartFile }; + reader_struct->bucketname_ = bucket_name; + reader_struct->filename_ = object_name; + if (kFailure == AccumulateNamesAndSizes(*reader_struct)) + { + //reader_struct is unusable return false; } @@ -978,38 +1004,59 @@ int driver_copyToLocal(const char* sSourceFilePathName, const char* sDestFilePat return false; } - // Read from the GCS object and write to the local file - std::string buffer(1024, '\0'); - spdlog::debug("Start reading {}", buffer); + int status = kSuccess; - bool complete = false; - while (!complete) { - reader.read(&buffer[0], buffer.size()); + for (uint i=0; ifilenames_.size() && status == kSuccess; i++) { + spdlog::debug("copyToLocal processing file {} = {}", i, reader_struct->filenames_[i].c_str()); - if (reader.bad()) { - spdlog::error("Error during read: {} {}", (int)(reader.status().code()), reader.status().message()); - complete = true; + // Create a ReadObject stream + auto reader = client.ReadObject(bucket_name, reader_struct->filenames_[i].c_str()); + if (!reader) { + spdlog::error("Error initializing download stream: {} {}", (int)(reader.status().code()), reader.status().message()); + status = kFailure; + break; } - spdlog::debug("Read {}", reader.gcount()); - if (reader.gcount() > 0) { - file_stream.write(buffer.data(), reader.gcount()); - } - else { - complete = true; + // Read from the GCS object and write to the local file + std::string buffer(1024, '\0'); + spdlog::debug("Start reading {}", buffer); + + bool complete = false; + bool headerlineSkipped = false; + while (!complete) { + reader.read(&buffer[0], buffer.size()); + + if (reader.bad()) { + spdlog::error("Error during read: {} {}", (int)(reader.status().code()), reader.status().message()); + complete = true; + } + spdlog::debug("Read {}", reader.gcount()); + + if (reader.gcount() > 0) { + char *buf_ptr = buffer.data(); + std::streamsize num_bytes = reader.gcount(); + if (i > 0 && !headerlineSkipped && reader_struct->commonHeaderLength_ > 0) { + // TODO: check bounds! + spdlog::debug("Skipping initial {} bytes", reader_struct->commonHeaderLength_); + buf_ptr += reader_struct->commonHeaderLength_; + num_bytes -= reader_struct->commonHeaderLength_; + headerlineSkipped = true; + } + file_stream.write(buf_ptr, num_bytes); + } + else { + complete = true; + } + } } spdlog::debug("Close output"); file_stream.close(); - if (!reader.status().ok()) { - std::cerr << "Error during download: " << reader.status() << "\n"; - return 0; - } spdlog::debug("Done copying"); - return 1; + return status; } int driver_copyFromLocal(const char* sSourceFilePathName, const char* sDestFilePathName) @@ -1027,14 +1074,14 @@ int driver_copyFromLocal(const char* sSourceFilePathName, const char* sDestFileP std::ifstream file_stream(sSourceFilePathName, std::ios::binary); if (!file_stream.is_open()) { spdlog::error("Failed to open local file: {}", sSourceFilePathName); - return false; + return kFailure; } // Create a WriteObject stream - auto writer = client.WriteObject(bucket_name, object_name, gcs::IfGenerationMatch(0)); + auto writer = client.WriteObject(bucket_name, object_name); if (!writer) { spdlog::error("Error initializing upload stream: {} {}", (int)(writer.metadata().status().code()), writer.metadata().status().message()); - return false; + return kFailure; } // Read from the local file and write to the GCS object @@ -1052,8 +1099,8 @@ int driver_copyFromLocal(const char* sSourceFilePathName, const char* sDestFileP if (!status) { spdlog::error("Error during file upload: {} {}", (int)(status.status().code()), status.status().message()); - return false; + return kFailure; } - return true; + return kSuccess; } \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0426bfd..cc6cba4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -10,7 +10,7 @@ FetchContent_MakeAvailable(googletest) include(GoogleTest) -add_executable(basic_test basic_test.cpp) +add_executable(basic_test basic_test.cpp drivertest.cpp) target_compile_options(basic_test PRIVATE $<$:-Wall> diff --git a/test/drivertest.cpp b/test/drivertest.cpp new file mode 100644 index 0000000..a0d3904 --- /dev/null +++ b/test/drivertest.cpp @@ -0,0 +1,291 @@ +#include + +#include "../src/gcsplugin.h" + +/* functions prototype */ +int test(const char *file_name_input, const char *file_name_output, const char *file_name_local, int nBufferSize); +int launch_test(const char *inputFilename, const char *outputFilename, const char *localOutput, int nBufferSize); +int copyFile(const char *file_name_input, const char *file_name_output, int nBufferSize); +int copyFileWithFseek(const char *file_name_input, const char *file_name_output, int nBufferSize); +int removeFile(const char *filename); +int compareSize(const char *file_name_output, long long int filesize); + +constexpr int kSuccess{ 1 }; +constexpr int kFailure{ 0 }; + +TEST(GCSDriverTest, End2EndTest_SingleFile_512KB_OK) +{ + const char* inputFilename = "gs://data-test-khiops-driver-gcs/khiops_data/bq_export/Adult/Adult-split-000000000001.txt"; + const char* outputFilename = "gs://data-test-khiops-driver-gcs/khiops_data/output/new_output.txt"; + const char* localOutput = "/tmp/out1.txt"; + + /* default size of buffer passed to driver */ + int nBufferSize = 512 * 1024; + + /* error indicator in case of error */ + int test_status = launch_test(inputFilename, outputFilename, localOutput, nBufferSize); + ASSERT_EQ(test_status, kSuccess); +} + +TEST(GCSDriverTest, End2EndTest_MultipartFile_512KB_OK) +{ + const char* inputFilename = "gs://data-test-khiops-driver-gcs/khiops_data/bq_export/Adult/Adult-split-00000000000*.txt"; + const char* outputFilename = "gs://data-test-khiops-driver-gcs/khiops_data/output/new_output2.txt"; + const char* localOutput = "/tmp/out2.txt"; + + /* default size of buffer passed to driver */ + int nBufferSize = 512 * 1024; + + /* error indicator in case of error */ + int test_status = launch_test(inputFilename, outputFilename, localOutput, nBufferSize); + ASSERT_EQ(test_status, kSuccess); +} + +int launch_test(const char *inputFilename, const char *outputFilename, const char *localOutput, int nBufferSize) +{ + int test_status = kSuccess; + + // Connection to the file system + bool bIsconnected = driver_connect(); + if (bIsconnected) + { + if (!driver_isConnected()) + { + test_status = kFailure; + fprintf(stderr, "ERROR : connection is done but driver is not connected\n"); + } + if (!driver_fileExists(inputFilename)) + { + fprintf(stderr, "ERROR : %s is missing\n", inputFilename); + test_status = kFailure; + } + // The real test begins here + if (test_status == kSuccess) + { + test_status = test(inputFilename, outputFilename, localOutput, nBufferSize); + } + driver_disconnect(); + } + else + { + test_status = kFailure; + fprintf(stderr, "ERROR : unable to connect to the file system\n"); + } + + if (test_status == kFailure) + { + printf("Test has failed\n"); + } + + return test_status; +} + +/* functions definitions */ + +int test(const char *file_name_input, const char *file_name_output, const char *file_name_local, int nBufferSize) +{ + // Basic information of the scheme + printf("scheme: %s\n", driver_getScheme()); + printf("is read-only: %d\n", driver_isReadOnly()); + + // Checks size of input file + long long int filesize = driver_getFileSize(file_name_input); + printf("size of %s is %lld\n", file_name_input, filesize); + + if (driver_fileExists(file_name_input) == 1) + printf("%s exists\n", file_name_input); + else + { + printf("%s is missing, abort\n", file_name_input); + return kFailure; + } + + int copy_status = kSuccess; + + // Test copying files + printf("Copy %s to %s\n", file_name_input, file_name_output); + copy_status = copyFile(file_name_input, file_name_output, nBufferSize); + if (copy_status == kSuccess) + { + compareSize(file_name_output, filesize); + //removeFile(file_name_output); + } + + // Test copying files with fseek + printf("Copy with fseek %s to %s ...\n", file_name_input, file_name_output); + copy_status = copyFileWithFseek(file_name_input, file_name_output, nBufferSize); + if (copy_status == kSuccess) + { + compareSize(file_name_output, filesize); + removeFile(file_name_output); + } + + // Copy to local + if (copy_status == kSuccess) + { + printf("Copy to local %s to %s ...\n", file_name_input, file_name_local); + copy_status = driver_copyToLocal(file_name_input, file_name_local); + if (copy_status != kSuccess) + printf("Error while copying : %s\n", driver_getlasterror()); + else + printf("copy %s to local is done\n", file_name_input); + } + + // Copy from local + if (copy_status == kSuccess) + { + printf("Copy from local %s to %s ...\n", file_name_local, file_name_output); + copy_status = driver_copyFromLocal(file_name_local, file_name_output); + if (copy_status != kSuccess) + printf("Error while copying : %s\n", driver_getlasterror()); + else + printf("copy %s from local is done\n", file_name_local); + if (!driver_fileExists(file_name_output)) + { + printf("%s is missing !\n", file_name_output); + copy_status = kFailure; + } + } + + return copy_status; +} + +// Copy file_name_input to file_name_output by steps of 1Kb +int copyFile(const char *file_name_input, const char *file_name_output, int nBufferSize) +{ + // Opens for read + void *fileinput = driver_fopen(file_name_input, 'r'); + if (fileinput == NULL) + { + printf("error : %s : %s\n", file_name_input, driver_getlasterror()); + return kFailure; + } + + int copy_status = kSuccess; + void *fileoutput = driver_fopen(file_name_output, 'w'); + if (fileoutput == NULL) + { + printf("error : %s : %s\n", file_name_input, driver_getlasterror()); + copy_status = kFailure; + } + + if (copy_status == kSuccess) + { + // Reads the file by steps of nBufferSize and writes to the output file at each step + char *buffer = new char[nBufferSize + 1](); + long long int sizeRead = nBufferSize; + long long int sizeWrite; + driver_fseek(fileinput, 0, SEEK_SET); + while (sizeRead == nBufferSize && copy_status == kSuccess) + { + sizeRead = driver_fread(buffer, sizeof(char), nBufferSize, fileinput); + if (sizeRead == -1) + { + copy_status = kFailure; + printf("error while reading %s : %s\n", file_name_input, driver_getlasterror()); + } + else + { + sizeWrite = driver_fwrite(buffer, sizeof(char), (size_t)sizeRead, fileoutput); + if (sizeWrite == -1) + { + copy_status = kFailure; + printf("error while writing %s : %s\n", file_name_output, driver_getlasterror()); + } + } + } + driver_fclose(fileoutput); + delete[](buffer); + } + driver_fclose(fileinput); + return copy_status; +} + +// Copy file_name_input to file_name_output by steps of 1Kb by using fseek before each read +int copyFileWithFseek(const char *file_name_input, const char *file_name_output, int nBufferSize) +{ + // Opens for read + void *fileinput = driver_fopen(file_name_input, 'r'); + if (fileinput == NULL) + { + printf("error : %s : %s\n", file_name_input, driver_getlasterror()); + return kFailure; + } + + int copy_status = kSuccess; + void *fileoutput = driver_fopen(file_name_output, 'w'); + if (fileoutput == NULL) + { + printf("error : %s : %s\n", file_name_input, driver_getlasterror()); + copy_status = kFailure; + } + + if (copy_status == kSuccess) + { + // Reads the file by steps of nBufferSize and writes to the output file at each step + char *buffer = new char[nBufferSize+1](); + long long int sizeRead = nBufferSize; + long long int sizeWrite; + int cummulativeRead = 0; + driver_fseek(fileinput, 0, SEEK_SET); + while (sizeRead == nBufferSize && copy_status == kSuccess) + { + driver_fseek(fileinput, cummulativeRead, SEEK_SET); + sizeRead = driver_fread(buffer, sizeof(char), nBufferSize, fileinput); + cummulativeRead += sizeRead; + if (sizeRead == -1) + { + copy_status = kFailure; + printf("error while reading %s : %s\n", file_name_input, driver_getlasterror()); + } + else + { + sizeWrite = driver_fwrite(buffer, sizeof(char), (size_t)sizeRead, fileoutput); + if (sizeWrite == -1) + { + copy_status = kFailure; + printf("error while writing %s : %s\n", file_name_output, driver_getlasterror()); + } + } + } + driver_fclose(fileoutput); + delete[](buffer); + } + driver_fclose(fileinput); + return copy_status; +} + +int removeFile(const char *filename) +{ + int remove_status = driver_remove(filename); + if (remove_status != kSuccess) + printf("Error while removing : %s\n", driver_getlasterror()); + if (driver_fileExists(filename)) + { + printf("File %s should be removed !\n", filename); + remove_status = kFailure; + } + return remove_status; +} + +int compareSize(const char *file_name_output, long long int filesize) +{ + int compare_status = kSuccess; + long long int filesize_output = driver_getFileSize(file_name_output); + printf("size of %s is %lld\n", file_name_output, filesize_output); + if (filesize_output != filesize) + { + printf("Sizes of input and output are different\n"); + compare_status = kFailure; + } + if (driver_fileExists(file_name_output)) + { + printf("File %s exists\n", file_name_output); + } + else + { + printf("Something's wrong : %s is missing\n", file_name_output); + compare_status = kFailure; + } + return compare_status; +}