diff --git a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java index bb2b400b..a8ff95e0 100644 --- a/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java +++ b/src/main/java/org/peergos/blockstore/s3/S3Blockstore.java @@ -2,6 +2,8 @@ import io.ipfs.cid.Cid; import io.ipfs.multihash.Multihash; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; import org.peergos.Hash; import org.peergos.blockstore.Blockstore; import org.peergos.blockstore.RateLimitException; @@ -32,6 +34,51 @@ public class S3Blockstore implements Blockstore { private static final Logger LOG = Logging.LOG(); + private static final Histogram readTimerLog = Histogram.build() + .labelNames("filesize") + .name("block_read_seconds") + .help("Time to read a block from immutable storage") + .exponentialBuckets(0.01, 2, 16) + .register(); + private static final Histogram writeTimerLog = Histogram.build() + .labelNames("filesize") + .name("s3_block_write_seconds") + .help("Time to write a block to immutable storage") + .exponentialBuckets(0.01, 2, 16) + .register(); + private static final Counter blockHeads = Counter.build() + .name("s3_block_heads") + .help("Number of block heads to S3") + .register(); + private static final Counter blockGets = Counter.build() + .name("s3_block_gets") + .help("Number of block gets to S3") + .register(); + private static final Counter failedBlockGets = Counter.build() + .name("s3_block_get_failures") + .help("Number of failed block gets to S3") + .register(); + private static final Counter blockPuts = Counter.build() + .name("s3_block_puts") + .help("Number of block puts to S3") + .register(); + private static final Histogram blockPutBytes = Histogram.build() + .labelNames("size") + .name("s3_block_put_bytes") + .help("Number of bytes written to S3") + .exponentialBuckets(0.01, 2, 16) + .register(); + + private static final Counter getRateLimited = Counter.build() + .name("s3_get_rate_limited") + .help("Number of times we get a http 429 rate limit response during a block get") + .register(); + + private static final Counter rateLimited = Counter.build() + .name("s3_rate_limited") + .help("Number of times we get a http 429 rate limit response") + .register(); + private final String region; private final String bucket; private final String regionEndpoint; @@ -245,6 +292,7 @@ private CompletableFuture> getSizeWithoutRetry(Cid cid) { PresignedUrl headUrl = S3Request.preSignHead(folder + hashToKey(cid), Optional.of(60), S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, region, accessKeyId, secretKey, useHttps, hasher).join(); Map> headRes = HttpUtil.head(headUrl.base, headUrl.fields); + blockHeads.inc(); long size = Long.parseLong(headRes.get("Content-Length").get(0)); return Futures.of(Optional.of((int)size)); } catch (FileNotFoundException f) { @@ -252,8 +300,9 @@ private CompletableFuture> getSizeWithoutRetry(Cid cid) { return Futures.of(Optional.empty()); } catch (IOException e) { String msg = e.getMessage(); - boolean rateLimited = msg.startsWith("SlowDown"); - if (rateLimited) { + boolean rateLimitedResult = msg.startsWith("SlowDown"); + if (rateLimitedResult) { + rateLimited.inc(); throw new RateLimitException(); } boolean notFound = msg.startsWith("NoSuchKey"); @@ -277,8 +326,10 @@ private CompletableFuture> getWithoutRetry(Cid cid) { Optional> range = Optional.empty(); PresignedUrl getUrl = S3Request.preSignGet(path, Optional.of(600), range, S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, region, accessKeyId, secretKey, useHttps, hasher).join(); + Histogram.Timer readTimer = readTimerLog.labels("read").startTimer(); try { byte[] block = HttpUtil.get(getUrl.base, getUrl.fields); + blockGets.inc(); blockMetadata.put(cid, block); return Futures.of(Optional.of(block)); } catch (SocketTimeoutException | SSLException e) { @@ -286,8 +337,10 @@ private CompletableFuture> getWithoutRetry(Cid cid) { throw new RateLimitException(); } catch (IOException e) { String msg = e.getMessage(); - boolean rateLimited = msg.startsWith("SlowDown"); - if (rateLimited) { + boolean rateLimitedResult = msg.startsWith("SlowDown"); + if (rateLimitedResult) { + getRateLimited.inc(); + rateLimited.inc(); throw new RateLimitException(); } boolean notFound = msg.startsWith("NoSuchKey"); @@ -295,7 +348,10 @@ private CompletableFuture> getWithoutRetry(Cid cid) { LOG.warning("S3 error reading " + path); LOG.log(Level.WARNING, msg, e); } + failedBlockGets.inc(); throw new RuntimeException(e.getMessage(), e); + } finally { + readTimer.observeDuration(); } } @@ -304,6 +360,7 @@ public CompletableFuture put(byte[] block, Cid.Codec codec) { return getWithBackoff(() -> putWithoutRetry(block, codec)); } public CompletableFuture putWithoutRetry(byte[] block, Cid.Codec codec) { + Histogram.Timer writeTimer = writeTimerLog.labels("write").startTimer(); byte[] hash = Hash.sha256(block); Cid cid = new Cid(1, codec, Multihash.Type.sha2_256, hash); String key = hashToKey(cid); @@ -316,15 +373,20 @@ public CompletableFuture putWithoutRetry(byte[] block, Cid.Codec codec) { S3AdminRequests.asAwsDate(ZonedDateTime.now()), host, extraHeaders, region, accessKeyId, secretKey, useHttps, hasher).join(); HttpUtil.put(putUrl.base, putUrl.fields, block); blockMetadata.put(cid, block); + blockPuts.inc(); + blockPutBytes.labels("size").observe(block.length); return CompletableFuture.completedFuture(cid); } catch (IOException e) { String msg = e.getMessage(); - boolean rateLimited = msg.startsWith("SlowDown"); - if (rateLimited) { + boolean rateLimitedResult = msg.startsWith("SlowDown"); + if (rateLimitedResult) { + rateLimited.inc(); throw new RateLimitException(); } LOG.log(Level.SEVERE, e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); + } finally { + writeTimer.observeDuration(); } }