Skip to content

Commit

Permalink
bigger request chunks to qlever, processing speed output
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickbr committed Oct 11, 2024
1 parent 5a0f4d0 commit 616440b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 27 deletions.
64 changes: 37 additions & 27 deletions src/qlever-petrimaps/GeomCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ size_t GeomCache::writeCbString(void *contents, size_t size, size_t nmemb,
size_t GeomCache::writeCb(void *contents, size_t size, size_t nmemb,
void *userp) {
size_t realsize = size * nmemb;

try {
static_cast<GeomCache *>(userp)->parse(static_cast<const char *>(contents),
realsize);
Expand Down Expand Up @@ -145,9 +146,11 @@ size_t GeomCache::writeCbCount(void *contents, size_t size, size_t nmemb,
void GeomCache::parse(const char *c, size_t size) {
_loadStatusStage = _LoadStatusStages::Parse;

_lastBytesReceived += size;

const char *start = c;
while (c < start + size) {
if (_raw.size() < 10000) _raw.push_back(*c);
if (_raw.size() < 1000) _raw.push_back(*c);
switch (_state) {
case IN_HEADER:
if (*c == '\n') {
Expand All @@ -170,8 +173,7 @@ void GeomCache::parse(const char *c, size_t size) {
_qidToIdF.write(reinterpret_cast<const char *>(&idm),
sizeof(IdMapping));
_qidToIdFSize++;
} else if ((p = _dangling.rfind("POINT(", 1)) !=
std::string::npos) {
} else if ((p = _dangling.rfind("POINT(", 1)) != std::string::npos) {
_curUniqueGeom++;
size_t i = 0;
p = parseMultiPoint(_dangling, p + 4, std::string::npos, &i);
Expand Down Expand Up @@ -269,15 +271,15 @@ void GeomCache::parse(const char *c, size_t size) {
&j);
}

if (memcmp(_dangling.c_str() + starts[i], "POLYGON(", 8) == 0) {
p = parsePolygon(_dangling, starts[i] + 7, starts[i + 1], &j);
}
if (memcmp(_dangling.c_str() + starts[i], "POLYGON(", 8) == 0) {
p = parsePolygon(_dangling, starts[i] + 7, starts[i + 1], &j);
}

if (memcmp(_dangling.c_str() + starts[i], "LINESTRING(", 11) ==
0) {
p = parseMultiLineString(_dangling, starts[i] + 9,
starts[i + 1], &j);
}
if (memcmp(_dangling.c_str() + starts[i], "LINESTRING(", 11) ==
0) {
p = parseMultiLineString(_dangling, starts[i] + 9,
starts[i + 1], &j);
}
}

// dummy element to keep sync
Expand Down Expand Up @@ -306,14 +308,20 @@ void GeomCache::parse(const char *c, size_t size) {
<< "%, " << _pointsFSize << " points, " << _linesFSize
<< " (open) polygons (with " << _linePointsFSize
<< " points), " << _geometryDuplicates
<< " duplicates)";
<< " duplicates, "
<< ((_lastBytesReceived / (1024.0 * 1024.0)) /
(TOOK(_lastReceivedTime) / 1000000000.0))
<< " MB/s)";

_lastReceivedTime = TIME();
_lastBytesReceived = 0;
}
_prev = _dangling;
_prev = std::move(_dangling);
_dangling.clear();
c++;
continue;
} else {
_prev = _dangling;
_prev = std::move(_dangling);
_dangling.clear();
c++;
continue;
Expand Down Expand Up @@ -385,7 +393,7 @@ size_t GeomCache::getCurrentProgress() { return _curRow; }
// _____________________________________________________________________________
void GeomCache::parseIds(const char *c, size_t size) {
for (size_t i = 0; i < size; i++) {
if (_raw.size() < 10000) _raw.push_back(c[i]);
if (_raw.size() < 1000) _raw.push_back(c[i]);
_curId.bytes[_curByte] = c[i];
_curByte = (_curByte + 1) % 8;

Expand All @@ -395,11 +403,10 @@ void GeomCache::parseIds(const char *c, size_t size) {
if (_curRow % 1000000 == 0) {
LOG(INFO) << "[GEOMCACHE] "
<< "@ " << _curRow << " (" << std::fixed
<< std::setprecision(2) << getLoadStatusPercent()
<< "%, " << _pointsFSize << " points, " << _linesFSize
<< std::setprecision(2) << getLoadStatusPercent() << "%, "
<< _pointsFSize << " points, " << _linesFSize
<< " (open) polygons (with " << _linePointsFSize
<< " points), " << _geometryDuplicates
<< " duplicates)";
<< " points), " << _geometryDuplicates << " duplicates)";
}

if (_curIdRow < _qidToId.size() && _qidToId[_curIdRow].qid == 0) {
Expand Down Expand Up @@ -432,7 +439,8 @@ void GeomCache::parseIds(const char *c, size_t size) {
// _qidToId; continuation geometries are marked by a
// preliminary qlever ID of 1, while the first geometry always has a
// preliminary id of 0
while (_curIdRow < _qidToId.size() - 1 && _qidToId[_curIdRow + 1].qid == 1) {
while (_curIdRow < _qidToId.size() - 1 &&
_qidToId[_curIdRow + 1].qid == 1) {
_qidToId[++_curIdRow].qid = _curId.val;
}

Expand All @@ -444,7 +452,7 @@ void GeomCache::parseIds(const char *c, size_t size) {
// _____________________________________________________________________________
void GeomCache::parseCount(const char *c, size_t size) {
for (size_t i = 0; i < size; i++) {
if (_raw.size() < 10000) _raw.push_back(c[i]);
if (_raw.size() < 1000) _raw.push_back(c[i]);
if (c[i] == '\n') _state = IN_ROW;
if (_state == IN_ROW) _dangling += c[i];
}
Expand All @@ -456,7 +464,7 @@ size_t GeomCache::requestSize() {
_dangling.clear();
_dangling.reserve(10000);
_raw.clear();
_raw.reserve(10000);
_raw.reserve(1000);

CURLcode res;
char errbuf[CURL_ERROR_SIZE];
Expand Down Expand Up @@ -526,13 +534,15 @@ void GeomCache::requestPart(size_t offset) {
_dangling.clear();
_dangling.reserve(10000);
_raw.clear();
_raw.reserve(10000);
_raw.reserve(1000);
_lastReceivedTime = TIME();
_lastBytesReceived = 0;

CURLcode res;
char errbuf[CURL_ERROR_SIZE];

if (_curl) {
auto qUrl = queryUrl(getQuery(_backendUrl), offset, 1000000);
auto qUrl = queryUrl(getQuery(_backendUrl), offset, 100000000);
curl_easy_setopt(_curl, CURLOPT_URL, qUrl.c_str());
curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, GeomCache::writeCb);
curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this);
Expand Down Expand Up @@ -600,7 +610,7 @@ void GeomCache::request() {
_lastQidToId = {-1, -1};

_raw.clear();
_raw.reserve(100000);
_raw.reserve(1000);

char *pointsFName = strdup("pointsXXXXXX");
int i = mkstemp(pointsFName);
Expand Down Expand Up @@ -657,7 +667,7 @@ void GeomCache::request() {

if (_curRow != _totalSize) {
LOG(WARN) << "Last received row was " << _curRow << ", but expected "
<< _totalSize << " rows (determined via count query)";
<< _totalSize << " rows (determined via count query)";
LOG(WARN) << "Last answer from QLever began with " << _raw;
}

Expand Down Expand Up @@ -720,7 +730,7 @@ void GeomCache::requestIds() {

if (_curRow != _totalSize) {
LOG(WARN) << "Last received row was " << _curRow << ", but expected "
<< _totalSize << " rows (determined via count query)";
<< _totalSize << " rows (determined via count query)";
LOG(WARN) << "Last answer from QLever began with " << _raw;
}

Expand Down
4 changes: 4 additions & 0 deletions src/qlever-petrimaps/GeomCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <string>
#include <unordered_map>
#include <vector>
#include <chrono>

#include "qlever-petrimaps/Misc.h"
#include "util/geo/Geo.h"
Expand Down Expand Up @@ -154,6 +155,9 @@ class GeomCache {
size_t _linesFSize;
size_t _qidToIdFSize;

size_t _lastBytesReceived;
std::chrono::time_point<std::chrono::high_resolution_clock> _lastReceivedTime;

std::fstream _pointsF;
std::fstream _linePointsF;
std::fstream _linesF;
Expand Down

0 comments on commit 616440b

Please sign in to comment.