Skip to content

Commit

Permalink
Add gRPC support to Vanilla (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch authored Dec 30, 2024
1 parent 5b6a085 commit a70db19
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 20 deletions.
12 changes: 6 additions & 6 deletions latte/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
`java-library`
kotlin("jvm") version "1.9.20"
id("com.google.devtools.ksp") version "1.9.20-1.0.14"
kotlin("jvm") version "2.0.21"
id("com.google.devtools.ksp") version "2.0.21-1.0.25"
}

group = "gg.beemo.latte"
Expand All @@ -10,7 +10,7 @@ version = "1.0.0"
dependencies {

// Kotlin
val kotlinCoroutinesVersion = "1.7.3"
val kotlinCoroutinesVersion = "1.9.0"
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinCoroutinesVersion")

Expand All @@ -24,18 +24,18 @@ dependencies {
implementation("com.rabbitmq:amqp-client:$rabbitVersion")

// JSON
val moshiVersion = "1.14.0"
val moshiVersion = "1.15.1"
implementation("com.squareup.moshi:moshi:$moshiVersion")
ksp("com.squareup.moshi:moshi-kotlin-codegen:$moshiVersion")

// Misc
implementation("org.jetbrains:annotations:24.1.0")
val log4jVersion = "2.22.0"
val log4jVersion = "2.24.1"
compileOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
testImplementation("org.apache.logging.log4j:log4j-core:$log4jVersion")

// JUnit testing framework
val junitVersion = "5.10.1"
val junitVersion = "5.11.2"
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")

Expand Down
3 changes: 3 additions & 0 deletions latte/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@
<Logger name="org.apache.kafka" level="WARN">
<AppenderRef ref="Console" />
</Logger>
<Logger name="io.grpc.netty" level="INFO">
<AppenderRef ref="Console" />
</Logger>
</Loggers>
</Configuration>
28 changes: 28 additions & 0 deletions proto/ratelimit.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";
package beemo.ratelimit;

option java_multiple_files = true;
option java_package = "gg.beemo.latte.proto";
option java_outer_classname = "RatelimitProto";


service Ratelimit {
rpc ReserveQuota(RatelimitRequest) returns (RatelimitQuota);
}

message RatelimitRequest {
RatelimitType type = 1;
fixed64 client_id = 2;
optional bool probe_only = 3;
optional uint32 max_delay = 4;
}

message RatelimitQuota {
bool granted = 1;
uint64 at = 2;
}

enum RatelimitType {
GLOBAL = 0;
IDENTIFY = 1;
}
13 changes: 10 additions & 3 deletions vanilla/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
# your environment variables in quotes unless it is part of it.

#------------
# Kafka Connection
# RabbitMQ Connection
#---------------
KAFKA_HOST=
KAFKA_USE_TLS=
RABBIT_HOST=
RABBIT_USE_TLS=
RABBIT_USERNAME=
RABBIT_PASSWORD=

#------------
# gRPC Server
#---------------
GRPC_PORT=
42 changes: 39 additions & 3 deletions vanilla/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,27 +1,63 @@
plugins {
application
java
kotlin("jvm") version "1.9.20"
kotlin("jvm") version "2.0.21"
id("com.google.protobuf") version "0.9.4"
}

group = "gg.beemo.vanilla"
version = "1.0.0"

val grpcVersion = "1.68.0"
val grpcKotlinStubVersion = "1.4.1"
val grpcProtobufVersion = "4.28.2"

dependencies {
// Kotlin
val kotlinCoroutinesVersion = "1.7.3"
val kotlinCoroutinesVersion = "1.9.0"
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")

// Beemo shared code
implementation("gg.beemo.latte:latte")

// gRPC
implementation("io.grpc:grpc-netty-shaded:$grpcVersion")
implementation("io.grpc:grpc-protobuf:$grpcVersion")
implementation("io.grpc:grpc-kotlin-stub:$grpcKotlinStubVersion")
implementation("com.google.protobuf:protobuf-kotlin:$grpcProtobufVersion")

// Logging
val log4jVersion = "2.22.0"
val log4jVersion = "2.24.1"
implementation("org.apache.logging.log4j:log4j-api:$log4jVersion")
implementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
implementation("org.apache.logging.log4j:log4j-slf4j2-impl:$log4jVersion")
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$grpcProtobufVersion"
}
plugins {
create("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
}
create("grpckt") {
artifact = "io.grpc:protoc-gen-grpc-kotlin:$grpcKotlinStubVersion:jdk8@jar"
}
}
generateProtoTasks {
all().forEach {
it.plugins {
create("grpc")
create("grpckt")
}
it.builtins {
create("kotlin")
}
}
}
}

repositories {
mavenCentral()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import kotlin.time.Duration.Companion.seconds
// Give request expiry a bit of leeway in case of clock drift
private val EXPIRY_GRACE_PERIOD = 5.seconds.inWholeMilliseconds

class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
class BrokerRpcRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {

private val log by Log
private val globalRatelimitProvider = RatelimitProvider(50, 1.seconds)
private val identifyRatelimitProvider = RatelimitProvider(1, 5.seconds)
private val globalRatelimitProvider = KafkaRatelimitProvider(50, 1.seconds)
private val identifyRatelimitProvider = KafkaRatelimitProvider(1, 5.seconds)

init {
rpc<SharedRatelimitData.RatelimitRequestData, Unit>(
Expand Down Expand Up @@ -54,7 +54,7 @@ class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {

}

private class RatelimitProvider(private val burst: Int, private val duration: Duration) {
private class KafkaRatelimitProvider(private val burst: Int, private val duration: Duration) {

private val limiters = ConcurrentHashMap<String, SuspendingRatelimit>()

Expand Down
2 changes: 2 additions & 0 deletions vanilla/src/main/java/gg/beemo/vanilla/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ public class Config {

public static String RABBIT_PASSWORD = "guest";

public static int GRPC_PORT = 1337;

}
134 changes: 134 additions & 0 deletions vanilla/src/main/java/gg/beemo/vanilla/GrpcRatelimitService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package gg.beemo.vanilla

import gg.beemo.latte.logging.Log
import gg.beemo.latte.proto.RatelimitGrpcKt
import gg.beemo.latte.proto.RatelimitQuota
import gg.beemo.latte.proto.ratelimitQuota
import gg.beemo.latte.proto.RatelimitRequest
import gg.beemo.latte.proto.RatelimitType
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.LinkedList
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

class GrpcRatelimitService : RatelimitGrpcKt.RatelimitCoroutineImplBase() {

private val log by Log
private val globalRatelimits = ClientRatelimits(50, 1.seconds)
private val identifyRatelimits = ClientRatelimits(1, 5.seconds)

override suspend fun reserveQuota(request: RatelimitRequest): RatelimitQuota {
val clientRatelimits = when (request.type) {
RatelimitType.GLOBAL -> globalRatelimits
RatelimitType.IDENTIFY -> identifyRatelimits
else -> throw IllegalArgumentException("Unknown ratelimit type ${request.type}")
}

val ratelimit = clientRatelimits.getClientRatelimit(request.clientId)
val maxDelay = if (request.hasMaxDelay()) request.maxDelay.toLong() else null
val (granted, at) = ratelimit.reserveQuota(request.probeOnly, maxDelay)
val delay = (at - System.currentTimeMillis()).coerceAtLeast(0)

if (request.probeOnly) {
log.debug(
"Probed {} quota slot for clientId {} is at {} (in {} ms)",
request.type,
request.clientId,
at,
delay
)
} else if (granted) {
log.debug(
"Reserved {} quota slot for clientId {} at {} (in {} ms)",
request.type,
request.clientId,
at,
delay
)
} else {
val maxTimestamp = if (maxDelay != null) System.currentTimeMillis() + maxDelay else null
log.debug(
"Failed to reserve {} quota slot for clientId {}, next slot would be at {} (in {} ms), requested max delay was {} (-> {})",
request.type,
request.clientId,
at,
delay,
maxDelay,
maxTimestamp
)
}

return ratelimitQuota {
this.granted = granted
this.at = at
}
}

}

private class ClientRatelimits(private val burst: Int, private val duration: Duration) {

private val limiters = ConcurrentHashMap<Long, RatelimitQueue>()

fun getClientRatelimit(clientId: Long): RatelimitQueue = limiters.computeIfAbsent(clientId) {
RatelimitQueue(burst, duration)
}

}

data class RatelimitSlot(
var usedQuota: Int,
val startsAt: Long,
val endsAt: Long,
)

private class RatelimitQueue(private val burst: Int, private val duration: Duration) {

private val queue = LinkedList<RatelimitSlot>()
private val lock = Mutex()

suspend fun reserveQuota(probeOnly: Boolean = false, maxDelay: Long? = null): Pair<Boolean, Long> =
lock.withLock {
val now = System.currentTimeMillis()

// Clean up expired slots
while (queue.isNotEmpty() && now > queue.first.endsAt) {
queue.removeFirst()
}

// Find free slot at the end of the queue
val lastSlot = queue.lastOrNull()
// No slots are used, so ratelimit is immediately available
if (lastSlot == null) {
// No timeout to check if we can immediately grant quota
if (probeOnly) {
return@withLock false to 0
}
queue.add(RatelimitSlot(1, now, now + duration.inWholeMilliseconds))
return@withLock true to 0
}

// Check if slot still has quota available
if (lastSlot.usedQuota < burst) {
val exceedsDelay = maxDelay != null && lastSlot.startsAt > now + maxDelay
if (exceedsDelay || probeOnly) {
return@withLock false to lastSlot.startsAt
}
lastSlot.usedQuota++
return@withLock true to lastSlot.startsAt
}

// Slot is full, create new slot
val exceedsDelay = maxDelay != null && lastSlot.endsAt > now + maxDelay
if (exceedsDelay || probeOnly) {
return@withLock false to lastSlot.endsAt
}
val nextStart = lastSlot.endsAt
val nextEnd = nextStart + duration.inWholeMilliseconds
queue.add(RatelimitSlot(1, nextStart, nextEnd))
return@withLock true to nextStart
}

}
16 changes: 12 additions & 4 deletions vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import gg.beemo.latte.CommonConfig
import gg.beemo.latte.broker.rabbitmq.RabbitConnection
import gg.beemo.latte.config.Configurator
import gg.beemo.latte.logging.Log
import gg.beemo.latte.logging.log
import io.grpc.Server
import io.grpc.ServerBuilder
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.LogManager

Expand All @@ -22,19 +23,26 @@ object Vanilla {
val brokerConnection = RabbitConnection(
rabbitHosts = Config.RABBIT_HOST,
serviceName = CommonConfig.BrokerServices.VANILLA,
instanceId = "0", // There will only ever be one instance of vanilla
instanceId = "0", // There will only ever be one instance of vanilla
useTls = Config.RABBIT_USE_TLS,
username = Config.RABBIT_USERNAME,
password = Config.RABBIT_PASSWORD,
)

log.debug("Initializing Kafka Ratelimit client")
val ratelimitClient = RatelimitClient(brokerConnection)
log.debug("Initializing Broker Ratelimit client")
val ratelimitClient = BrokerRpcRatelimitClient(brokerConnection)

log.debug("Initializing gRPC Ratelimit client")
val grpcServer: Server = ServerBuilder.forPort(Config.GRPC_PORT)
.addService(GrpcRatelimitService())
.build()
.start()

Runtime.getRuntime().addShutdownHook(Thread({
log.info("Destroying everything")
ratelimitClient.destroy()
brokerConnection.destroy()
grpcServer.shutdown().awaitTermination()
LogManager.shutdown(true, true)
}, "Vanilla Shutdown Hook"))

Expand Down
1 change: 1 addition & 0 deletions vanilla/src/main/proto/ratelimit.proto

0 comments on commit a70db19

Please sign in to comment.