Skip to content

Commit

Permalink
Upgrade to Kafka 3.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Apr 11, 2024
1 parent 9f96cb5 commit 512efbf
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ClientConfig extends AbstractConfig {
public static final long RECONNECT_BACKOFF_MS_DEFAULT = 50L;
public static final long RECONNECT_BACKOFF_MAX_MS_DEFAULT = 1000L;
public static final long RETRY_BACKOFF_MS_DEFAULT = 100L;
public static final long RETRY_BACKOFF_MAX_MS_DEFAULT = 1000L;
public static final int REQUEST_TIMEOUT_MS_DEFAULT = 305000;
public static final long CONNECTIONS_MAX_IDLE_MS_DEFAULT = 9 * 60 * 1000;

Expand Down Expand Up @@ -88,6 +89,12 @@ class ClientConfig extends AbstractConfig {
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
RETRY_BACKOFF_MAX_MS_DEFAULT,
atLeast(0L),
ConfigDef.Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC)
.define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
// chosen to be higher than the default of max.poll.interval.ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public KarelDbCoordinator(
String metricGrpPrefix,
Time time,
long retryBackoffMs,
long retryBackoffMaxMs,
KarelDbIdentity identity,
KarelDbRebalanceListener listener) {
super(
Expand All @@ -77,6 +78,7 @@ public KarelDbCoordinator(
groupId,
Optional.empty(),
retryBackoffMs,
retryBackoffMaxMs,
true
),
logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class KarelDbLeaderElector implements KarelDbRebalanceListener, UrlProvid
private final Metrics metrics;
private final Metadata metadata;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private final KarelDbCoordinator coordinator;
private final List<URI> listeners;
private final KarelDbIdentity myIdentity;
Expand Down Expand Up @@ -110,11 +111,13 @@ public KarelDbLeaderElector(KarelDbConfig config, KarelDbEngine engine) throws K

this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.retryBackoffMs = clientConfig.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = clientConfig.getLong(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG);
String groupId = config.getString(KarelDbConfig.CLUSTER_GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[KarelDB clientId=" + clientId + ", groupId="
+ groupId + "] ");
this.metadata = new Metadata(
retryBackoffMs,
retryBackoffMaxMs,
clientConfig.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
logContext,
new ClusterResourceListeners()
Expand Down Expand Up @@ -169,6 +172,7 @@ public KarelDbLeaderElector(KarelDbConfig config, KarelDbEngine engine) throws K
metricGrpPrefix,
time,
retryBackoffMs,
retryBackoffMaxMs,
myIdentity,
this
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class KarelDbCoordinatorTest {
private int rebalanceTimeoutMs = 60;
private int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100;
private long retryBackoffMaxMs = 1000;
private MockTime time;
private MockClient client;
private Cluster cluster = TestUtils.singletonCluster("topic", 1);
Expand All @@ -90,7 +91,8 @@ public class KarelDbCoordinatorTest {
@Before
public void setup() {
this.time = new MockTime();
this.metadata = new Metadata(0, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners());
this.metadata = new Metadata(
0, Long.MAX_VALUE, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners());
this.client = new MockClient(time, new MockClient.MockMetadataUpdater() {
@Override
public List<Node> fetchNodes() {
Expand Down Expand Up @@ -124,6 +126,7 @@ public void update(Time time, MockClient.MetadataUpdate update) {
"kdb-" + groupId,
time,
retryBackoffMs,
retryBackoffMaxMs,
LEADER_INFO,
rebalanceListener
);
Expand Down

0 comments on commit 512efbf

Please sign in to comment.