Skip to content

Commit

Permalink
Fix write operation
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Gouache committed Jul 11, 2024
1 parent 86fbfdb commit 353aaaa
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 77 deletions.
39 changes: 18 additions & 21 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,38 @@
"toolchainFile": "${sourceDir}/vcpkg/scripts/buildsystems/vcpkg.cmake"
},
{
"name": "ninja-multi",
"displayName": "Ninja without vcpkg",
"description": "Configure Ninja project files for all configurations",
"binaryDir": "${sourceDir}/builds/${presetName}",
"generator": "Ninja Multi-Config"
"name": "ninja-multi-vcpkg-debug",
"displayName": "Ninja Multi-Config with vcpkg (Debug)",
"inherits": "ninja-multi-vcpkg",
"binaryDir": "${sourceDir}/builds/ninja-multi-vcpkg",
"cacheVariables": {
"CMAKE_BUILD_TYPE": "Debug"
}
},
{
"name": "ninja-multi-vcpkg-release",
"displayName": "Ninja Multi-Config with vcpkg (Release)",
"inherits": "ninja-multi-vcpkg",
"binaryDir": "${sourceDir}/builds/ninja-multi-vcpkg",
"cacheVariables": {
"CMAKE_BUILD_TYPE": "Release"
}
}
],
"buildPresets": [
{
"name": "ninja-vcpkg-debug",
"configurePreset": "ninja-multi-vcpkg",
"configurePreset": "ninja-multi-vcpkg-debug",
"displayName": "Build (Debug)",
"description": "Build with Ninja/vcpkg (Debug)",
"configuration": "Debug"
},
{
"name": "ninja-vcpkg-release",
"configurePreset": "ninja-multi-vcpkg",
"configurePreset": "ninja-multi-vcpkg-release",
"displayName": "Build (Release)",
"description": "Build with Ninja/vcpkg (Release)",
"configuration": "Release"
},
{
"name": "ninja-debug",
"configurePreset": "ninja-multi",
"displayName": "Build (Debug)",
"description": "Build with Ninja",
"configuration": "Debug"
},
{
"name": "ninja-release",
"configurePreset": "ninja-multi",
"displayName": "Build (release)",
"description": "Build with Ninja",
"configuration": "Release"
}
],
"testPresets": [
Expand Down
157 changes: 102 additions & 55 deletions src/gcsplugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>

#include <limits.h>
#include <google/cloud/storage/object_write_stream.h>

constexpr const char* version = "0.1.0";
constexpr const char* driver_name = "GCS driver";
Expand Down Expand Up @@ -51,10 +52,17 @@ struct MultiPartFile
tOffset total_size_{ 0 };
};

struct WriteFile

Check warning on line 55 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'WriteFile': copy constructor was implicitly defined as deleted

Check warning on line 55 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'WriteFile': assignment operator was implicitly defined as deleted
{
std::string bucketname_;
std::string filename_;
google::cloud::storage::ObjectWriteStream writer_;
};

enum class HandleType { kRead, kWrite, kAppend };

using ReaderPtr = std::unique_ptr<MultiPartFile>;
using WriterPtr = std::unique_ptr<gcs::ObjectWriteStream>;
using WriterPtr = std::unique_ptr<WriteFile>;

union ClientVariant

Check warning on line 67 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'ClientVariant': copy constructor was implicitly defined as deleted

Check warning on line 67 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'ClientVariant': assignment operator was implicitly defined as deleted
{
Expand Down Expand Up @@ -103,7 +111,7 @@ struct Handle
}

MultiPartFile& Reader() { return *(var.reader); }
gcs::ObjectWriteStream& Writer() { return *(var.writer); }
WriteFile& Writer() { return *(var.writer); }
};

using HandlePtr = std::unique_ptr<Handle>;
Expand Down Expand Up @@ -175,24 +183,6 @@ long long int DownloadFileRangeToBuffer(const std::string& bucket_name,
return num_read;
}

bool UploadBufferToGcs(const std::string& bucket_name,
const std::string& object_name,
const char* buffer,
std::size_t buffer_size) {

auto writer = client.WriteObject(bucket_name, object_name);
writer.write(buffer, buffer_size);
writer.Close();

auto status = writer.metadata();
if (!status) {
spdlog::error("Error during upload: {} {}", (int)(status.status().code()), status.status().message());
return false;
}

return true;
}

bool ParseGcsUri(const std::string& gcs_uri, std::string& bucket_name, std::string& object_name)
{
char const* prefix = "gs://";
Expand Down Expand Up @@ -410,7 +400,12 @@ int driver_fileExists(const char* sFilePathName)

auto status_or_metadata_list = client.ListObjects(bucket_name, gcs::MatchGlob{ object_name });
const auto first_item_it = status_or_metadata_list.begin();
if ((first_item_it == status_or_metadata_list.end()) || !(*first_item_it))
if (first_item_it == status_or_metadata_list.end())
{
spdlog::debug("Object does not exist");
return kFalse;
}
if (!(*first_item_it))
{
spdlog::error("Error checking object");
return kFalse;
Expand Down Expand Up @@ -606,6 +601,7 @@ int AccumulateNamesAndSizes(MultiPartFile& h)
}
}
}
h.commonHeaderLength_ = header_size;

h.total_size_ = *(h.cumulativeSize_.rbegin());

Expand Down Expand Up @@ -651,7 +647,14 @@ void* driver_fopen(const char* filename, char mode)
}
case 'w':
{
active_handles.push_back(MakeHandlePtrFromWriter({}));
auto writer = client.WriteObject(bucketname, objectname);
if (!writer) {
spdlog::error("Error initializing write stream: {} {}", (int)(writer.metadata().status().code()), writer.metadata().status().message());
return 0;
}
WriterPtr writer_struct{ new WriteFile };
writer_struct->writer_ = std::move(writer);
active_handles.push_back(MakeHandlePtrFromWriter(std::move(writer_struct)));
break;
}
case 'a':
Expand All @@ -675,6 +678,16 @@ int driver_fclose(void* stream)

spdlog::debug("fclose {}", (void*)stream);

Handle* stream_h = reinterpret_cast<Handle*>(stream);
if (HandleType::kWrite == stream_h->type)
{
stream_h->var.writer->writer_.Close();
auto status = stream_h->var.writer->writer_.metadata();
if (!status) {
spdlog::error("Error during upload: {} {}", (int)(status.status().code()), status.status().message());
}
}

return EraseRemove(stream);
}

Expand Down Expand Up @@ -896,9 +909,20 @@ long long int driver_fwrite(const void* ptr, size_t size, size_t count, void* st
spdlog::debug("fwrite {} {} {} {}", ptr, size, count, stream);

assert(stream != NULL);
MultiPartFile* h = (MultiPartFile*)stream;
Handle* stream_h = reinterpret_cast<Handle*>(stream);
if (HandleType::kWrite != stream_h->type)
{
spdlog::error("Cannot write on not writing stream");
return -1;
}

UploadBufferToGcs(h->bucketname_, h->filename_, (char*)ptr, size * count);
stream_h->var.writer->writer_.write((const char*)ptr, size * count);
if (stream_h->var.writer->writer_.bad()) {
auto status = stream_h->var.writer->writer_.last_status();
spdlog::error("Error during upload: {} {}", (int)(status.code()), status.message());
return -1;
}
spdlog::debug("Write status after write: good {}, bad {}, fail {}, goodbit {}", stream_h->var.writer->writer_.good(), stream_h->var.writer->writer_.bad(), stream_h->var.writer->writer_.fail(), (int)(stream_h->var.writer->writer_.goodbit));

// TODO proper error handling...
return size * count;
Expand All @@ -922,10 +946,10 @@ int driver_remove(const char* filename)
auto status = client.DeleteObject(bucket_name, object_name);
if (!status.ok()) {
spdlog::error("Error deleting object: {} {}", (int)(status.code()), status.message());
return 0;
return kFailure;
}

return 1;
return kSuccess;
}

int driver_rmdir(const char* filename)
Expand Down Expand Up @@ -964,10 +988,12 @@ int driver_copyToLocal(const char* sSourceFilePathName, const char* sDestFilePat
ParseGcsUri(sSourceFilePathName, bucket_name, object_name);
FallbackToDefaultBucket(bucket_name);

// Create a ReadObject stream
auto reader = client.ReadObject(bucket_name, object_name);
if (!reader) {
spdlog::error("Error initializing download stream: {} {}", (int)(reader.status().code()), reader.status().message());
ReaderPtr reader_struct{ new MultiPartFile };
reader_struct->bucketname_ = bucket_name;
reader_struct->filename_ = object_name;
if (kFailure == AccumulateNamesAndSizes(*reader_struct))
{
//reader_struct is unusable
return false;
}

Expand All @@ -978,38 +1004,59 @@ int driver_copyToLocal(const char* sSourceFilePathName, const char* sDestFilePat
return false;
}

// Read from the GCS object and write to the local file
std::string buffer(1024, '\0');
spdlog::debug("Start reading {}", buffer);
int status = kSuccess;

bool complete = false;
while (!complete) {
reader.read(&buffer[0], buffer.size());
for (uint i=0; i<reader_struct->filenames_.size() && status == kSuccess; i++) {

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'uint': undeclared identifier

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

syntax error: missing ';' before identifier 'i'

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

syntax error: missing ';' before '='

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

syntax error: missing ')' before '='

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

syntax error: '='

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'i': undeclared identifier

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'i': undeclared identifier

Check failure on line 1009 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

syntax error: ')'
spdlog::debug("copyToLocal processing file {} = {}", i, reader_struct->filenames_[i].c_str());

Check failure on line 1010 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'i': undeclared identifier

Check failure on line 1010 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / windows-latest-hosted-ninja-vcpkg_submod-autocache

'i': undeclared identifier

if (reader.bad()) {
spdlog::error("Error during read: {} {}", (int)(reader.status().code()), reader.status().message());
complete = true;
// Create a ReadObject stream
auto reader = client.ReadObject(bucket_name, reader_struct->filenames_[i].c_str());
if (!reader) {
spdlog::error("Error initializing download stream: {} {}", (int)(reader.status().code()), reader.status().message());
status = kFailure;
break;
}
spdlog::debug("Read {}", reader.gcount());

if (reader.gcount() > 0) {
file_stream.write(buffer.data(), reader.gcount());
}
else {
complete = true;
// Read from the GCS object and write to the local file
std::string buffer(1024, '\0');
spdlog::debug("Start reading {}", buffer);

bool complete = false;
bool headerlineSkipped = false;
while (!complete) {
reader.read(&buffer[0], buffer.size());

if (reader.bad()) {
spdlog::error("Error during read: {} {}", (int)(reader.status().code()), reader.status().message());
complete = true;
}
spdlog::debug("Read {}", reader.gcount());

if (reader.gcount() > 0) {
char *buf_ptr = buffer.data();

Check failure on line 1036 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / macos-13-hosted-ninja-vcpkg_submod-autocache

cannot initialize a variable of type 'char *' with an rvalue of type 'const value_type *' (aka 'const char *')

Check failure on line 1036 in src/gcsplugin.cpp

View workflow job for this annotation

GitHub Actions / macos-14-hosted-ninja-vcpkg_submod-autocache

cannot initialize a variable of type 'char *' with an rvalue of type 'const value_type *' (aka 'const char *')
std::streamsize num_bytes = reader.gcount();
if (i > 0 && !headerlineSkipped && reader_struct->commonHeaderLength_ > 0) {
// TODO: check bounds!
spdlog::debug("Skipping initial {} bytes", reader_struct->commonHeaderLength_);
buf_ptr += reader_struct->commonHeaderLength_;
num_bytes -= reader_struct->commonHeaderLength_;
headerlineSkipped = true;
}
file_stream.write(buf_ptr, num_bytes);
}
else {
complete = true;
}

}

}
spdlog::debug("Close output");
file_stream.close();

if (!reader.status().ok()) {
std::cerr << "Error during download: " << reader.status() << "\n";
return 0;
}
spdlog::debug("Done copying");

return 1;
return status;
}

int driver_copyFromLocal(const char* sSourceFilePathName, const char* sDestFilePathName)
Expand All @@ -1027,14 +1074,14 @@ int driver_copyFromLocal(const char* sSourceFilePathName, const char* sDestFileP
std::ifstream file_stream(sSourceFilePathName, std::ios::binary);
if (!file_stream.is_open()) {
spdlog::error("Failed to open local file: {}", sSourceFilePathName);
return false;
return kFailure;
}

// Create a WriteObject stream
auto writer = client.WriteObject(bucket_name, object_name, gcs::IfGenerationMatch(0));
auto writer = client.WriteObject(bucket_name, object_name);
if (!writer) {
spdlog::error("Error initializing upload stream: {} {}", (int)(writer.metadata().status().code()), writer.metadata().status().message());
return false;
return kFailure;
}

// Read from the local file and write to the GCS object
Expand All @@ -1052,8 +1099,8 @@ int driver_copyFromLocal(const char* sSourceFilePathName, const char* sDestFileP
if (!status) {
spdlog::error("Error during file upload: {} {}", (int)(status.status().code()), status.status().message());

return false;
return kFailure;
}

return true;
return kSuccess;
}
2 changes: 1 addition & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ FetchContent_MakeAvailable(googletest)

include(GoogleTest)

add_executable(basic_test basic_test.cpp)
add_executable(basic_test basic_test.cpp drivertest.cpp)

target_compile_options(basic_test
PRIVATE $<$<CXX_COMPILER_ID:MSVC>:-Wall>
Expand Down
Loading

0 comments on commit 353aaaa

Please sign in to comment.