Skip to content

Commit

Permalink
Merge pull request #66 from Peergos/feat/s3-logging
Browse files Browse the repository at this point in the history
Add prometheus metrics to S3 blockstore
  • Loading branch information
ianopolous authored Oct 24, 2023
2 parents a18626f + da03d07 commit 13fd74b
Showing 1 changed file with 68 additions and 6 deletions.
74 changes: 68 additions & 6 deletions src/main/java/org/peergos/blockstore/s3/S3Blockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.ipfs.cid.Cid;
import io.ipfs.multihash.Multihash;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import org.peergos.Hash;
import org.peergos.blockstore.Blockstore;
import org.peergos.blockstore.RateLimitException;
Expand Down Expand Up @@ -32,6 +34,51 @@ public class S3Blockstore implements Blockstore {

private static final Logger LOG = Logging.LOG();

private static final Histogram readTimerLog = Histogram.build()
.labelNames("filesize")
.name("block_read_seconds")
.help("Time to read a block from immutable storage")
.exponentialBuckets(0.01, 2, 16)
.register();
private static final Histogram writeTimerLog = Histogram.build()
.labelNames("filesize")
.name("s3_block_write_seconds")
.help("Time to write a block to immutable storage")
.exponentialBuckets(0.01, 2, 16)
.register();
private static final Counter blockHeads = Counter.build()
.name("s3_block_heads")
.help("Number of block heads to S3")
.register();
private static final Counter blockGets = Counter.build()
.name("s3_block_gets")
.help("Number of block gets to S3")
.register();
private static final Counter failedBlockGets = Counter.build()
.name("s3_block_get_failures")
.help("Number of failed block gets to S3")
.register();
private static final Counter blockPuts = Counter.build()
.name("s3_block_puts")
.help("Number of block puts to S3")
.register();
private static final Histogram blockPutBytes = Histogram.build()
.labelNames("size")
.name("s3_block_put_bytes")
.help("Number of bytes written to S3")
.exponentialBuckets(0.01, 2, 16)
.register();

private static final Counter getRateLimited = Counter.build()
.name("s3_get_rate_limited")
.help("Number of times we get a http 429 rate limit response during a block get")
.register();

private static final Counter rateLimited = Counter.build()
.name("s3_rate_limited")
.help("Number of times we get a http 429 rate limit response")
.register();

private final String region;
private final String bucket;
private final String regionEndpoint;
Expand Down Expand Up @@ -245,15 +292,17 @@ private CompletableFuture<Optional<Integer>> getSizeWithoutRetry(Cid cid) {
PresignedUrl headUrl = S3Request.preSignHead(folder + hashToKey(cid), Optional.of(60),
S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, region, accessKeyId, secretKey, useHttps, hasher).join();
Map<String, List<String>> headRes = HttpUtil.head(headUrl.base, headUrl.fields);
blockHeads.inc();
long size = Long.parseLong(headRes.get("Content-Length").get(0));
return Futures.of(Optional.of((int)size));
} catch (FileNotFoundException f) {
LOG.warning("S3 404 error reading " + cid);
return Futures.of(Optional.empty());
} catch (IOException e) {
String msg = e.getMessage();
boolean rateLimited = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code>");
if (rateLimited) {
boolean rateLimitedResult = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code>");
if (rateLimitedResult) {
rateLimited.inc();
throw new RateLimitException();
}
boolean notFound = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>NoSuchKey</Code>");
Expand All @@ -277,25 +326,32 @@ private CompletableFuture<Optional<byte[]>> getWithoutRetry(Cid cid) {
Optional<Pair<Integer, Integer>> range = Optional.empty();
PresignedUrl getUrl = S3Request.preSignGet(path, Optional.of(600), range,
S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, region, accessKeyId, secretKey, useHttps, hasher).join();
Histogram.Timer readTimer = readTimerLog.labels("read").startTimer();
try {
byte[] block = HttpUtil.get(getUrl.base, getUrl.fields);
blockGets.inc();
blockMetadata.put(cid, block);
return Futures.of(Optional.of(block));
} catch (SocketTimeoutException | SSLException e) {
// S3 can't handle the load so treat this as a rate limit and slow down
throw new RateLimitException();
} catch (IOException e) {
String msg = e.getMessage();
boolean rateLimited = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code>");
if (rateLimited) {
boolean rateLimitedResult = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code>");
if (rateLimitedResult) {
getRateLimited.inc();
rateLimited.inc();
throw new RateLimitException();
}
boolean notFound = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>NoSuchKey</Code>");
if (! notFound) {
LOG.warning("S3 error reading " + path);
LOG.log(Level.WARNING, msg, e);
}
failedBlockGets.inc();
throw new RuntimeException(e.getMessage(), e);
} finally {
readTimer.observeDuration();
}
}

Expand All @@ -304,6 +360,7 @@ public CompletableFuture<Cid> put(byte[] block, Cid.Codec codec) {
return getWithBackoff(() -> putWithoutRetry(block, codec));
}
public CompletableFuture<Cid> putWithoutRetry(byte[] block, Cid.Codec codec) {
Histogram.Timer writeTimer = writeTimerLog.labels("write").startTimer();
byte[] hash = Hash.sha256(block);
Cid cid = new Cid(1, codec, Multihash.Type.sha2_256, hash);
String key = hashToKey(cid);
Expand All @@ -316,15 +373,20 @@ public CompletableFuture<Cid> putWithoutRetry(byte[] block, Cid.Codec codec) {
S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, extraHeaders, region, accessKeyId, secretKey, useHttps, hasher).join();
HttpUtil.put(putUrl.base, putUrl.fields, block);
blockMetadata.put(cid, block);
blockPuts.inc();
blockPutBytes.labels("size").observe(block.length);
return CompletableFuture.completedFuture(cid);
} catch (IOException e) {
String msg = e.getMessage();
boolean rateLimited = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code>");
if (rateLimited) {
boolean rateLimitedResult = msg.startsWith("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code>");
if (rateLimitedResult) {
rateLimited.inc();
throw new RateLimitException();
}
LOG.log(Level.SEVERE, e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
writeTimer.observeDuration();
}
}

Expand Down

0 comments on commit 13fd74b

Please sign in to comment.