Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding the forceSync flag for handling conflicting topic configs #170

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [0.5.1] - SNAPSHOT
### Added
- Updated `README` with Expedia Group stream registry announcement (#155)
- Added the `forceSync` flag for handling conflicting topic configs while upserting stream (#114)

### Changed
- Updated mkdocs.yml to `expediagroup` (#168)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ public class Stream {
@ApiModelProperty(example = "3")
int replicationFactor;

/**
* Should the provided topic config be forcibly synced with the existing, underlying topic (if not the same)?
*/
@ApiModelProperty(example = "false")
@Builder.Default
Boolean forceSync = false;

@JsonPOJOBuilder(withPrefix = "")
public static final class StreamBuilder {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ public interface InfraManager {
* @param replicationFactor replicationFactor for each of those topics
* @param topicConfig topic config to use for each of these topics
* @param isNewStream whether or not this invocation results from existing or new stream in stream registry.
* @param forceSync whether or not the provided topic config be forcibly synced with the existing, underlying topic (if not the same).
* @throws StreamCreationException when Stream could not be created in the underlying infrastructure for following reasons
* a) Input Configs and the existing configs does not match for a new Stream on-boarded to StreamRegistry,
* but already available in the infrastructure.
*/
void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream)
void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream, boolean forceSync)
throws StreamCreationException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void upsertTopics(Stream stream, String vpc, boolean isNewStream) throws
topicConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
topicConfig.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, zkConnect);

infraManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig, isNewStream);
infraManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig, isNewStream, stream.getForceSync());
log.info("Topic {} created/updated at {}", stream.getName(), bootstrapServer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Optional<ClusterValue> getClusterByKey(ClusterKey key) {
}

@Override
public void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream) {
public void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream, boolean forceSync) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void upsertCluster(ClusterKey clusterKey, ClusterValue clusterValue) {
}

@Override
public void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream)
public void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream, boolean forceSync)
throws StreamCreationException {
// TODO - Cannot guarantee against race conditions... should probably move to event-source paradigm to
// protect against this (and maybe employ optimistic locking for extra safety).
Expand All @@ -202,7 +202,7 @@ public void upsertTopics(Collection<String> topics, int partitions, int replicat
List<String> topicsToCreate = partitionMaps.get(false);

// update any topics that are necessary
updateTopics(zkUtils, topicsToUpdate, topicConfigMap, isNewStream);
updateTopics(zkUtils, topicsToUpdate, topicConfigMap, isNewStream, forceSync);

// now create any topics that were necessary to create this run
createTopics(zkUtils, topicsToCreate, partitions, replicationFactor, topicConfigMap);
Expand Down Expand Up @@ -233,7 +233,7 @@ private boolean topicExists(ZkUtils zkUtils, String topic) {
return topicExists;
}

void updateTopics(ZkUtils zkUtils, List<String> topicsToUpdate, Map<String, String> topicConfigMap, boolean isStreamNotAvailableInStreamRegistryDB)
void updateTopics(ZkUtils zkUtils, List<String> topicsToUpdate, Map<String, String> topicConfigMap, boolean isStreamNotAvailableInStreamRegistryDB, boolean forceSync)
throws StreamCreationException {
for (String topic : topicsToUpdate) {
// update topic
Expand All @@ -254,8 +254,12 @@ void updateTopics(ZkUtils zkUtils, List<String> topicsToUpdate, Map<String, Stri
// to exactly match downstream config when the stream-registry has not "onboarded" existing topic
// for the first time.

// TODO Alternatively we can add a forceSync=true flag, ignoring any user provided info, and only updating SR with the underlying settings
// We should probably do forceSync=true anyway, as it provides a simple way to keep things in sync (#114)
// Added a forceSync=true flag, ignoring any user provided info, and only updating SR with the underlying settings (#114)
if (forceSync) {
// NOTHING TO DO!
log.info("topic config mismatch for {} ignored. Input config={}, actual (retained) config={}", topic, topicConfigMap, actualTopicConfig);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is being "forced" if all you do is continue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the flag is "forcing" the stream-registry to sync the given topic configs with what exists already on the underlying brokers.
I guess I can say that by continueing here, the code is "forcing" the flow to not go ahead to the following exception generation ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put alternative names in #114

Copy link
Contributor

@OneCricketeer OneCricketeer Apr 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoring any user provided info, and only updating SR with the underlying settings

The way I interpret that is to take actualTopicConfig and overwrite topicConfigMap with it... i.e. "forcing" the actual configs to "sync" to the Stream Config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't continue actually perform the UPDATE???
I agree with @Cricket007 remarks on actualTopicConfig and topicConfigMap... where we should be overriding what the user provided, with what is actually out there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't continue actually perform the UPDATE?

No, it breaks here and goes to the next iteration of the loop. updateTopic isn't called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
if (isStreamNotAvailableInStreamRegistryDB) {
throw new StreamCreationException(String.format("Error: Input configs=%s and actual configs=%s are not same for topic=%s",
topicConfigMap, actualTopicConfig, topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testUpsertTopicsForNewStream() throws StreamCreationException {
when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true);

//New Stream
kafkaManager.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true);
kafkaManager.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true, false);

// expecting an exception to be thrown because topic exists but request doesn't match the config
}
Expand All @@ -110,7 +110,7 @@ public void testUpsertTopicsForExistingStreamWithMatchingConfig() throws StreamC
KafkaInfraManager kafkaManagerSpy = spy(kafkaManager);

// Existing Stream, but PROPS match!! should not have an exception
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, TOPIC_WITH_CNXN_PROPS, true);
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, TOPIC_WITH_CNXN_PROPS, true, false);

// verify change topic DOES NOT HAPPEN because props match
verifyStatic(AdminUtils.class, times(0));
Expand All @@ -129,7 +129,7 @@ public void testUpsertTopicsForExistingStream() throws StreamCreationException {
KafkaInfraManager kafkaManagerSpy = spy(kafkaManager);

//Existing Stream
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false);
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false, false);

//verify change topic happens because isNewStream=false
verifyStatic(AdminUtils.class, times(1));
Expand All @@ -147,7 +147,7 @@ public void testUpsertTopicsForNewTopic() throws StreamCreationException {
KafkaInfraManager kafkaManagerSpy = spy(kafkaManager);

// Not existing Stream
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true);
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true, false);

//verify create topic happens when requested topic does not exist
verifyStatic(AdminUtils.class, times(0));
Expand All @@ -164,7 +164,7 @@ public void testUpsertTopicsForNewTopicExistsInSR() throws StreamCreationExcepti
KafkaInfraManager kafkaManagerSpy = spy(kafkaManager);

//Existing Stream
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false);
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false, false);

// note: this is a weird case, because somehow the stream exists in SR, but the underlying topic does NOT
// might want to consider this corner case a bit more. Currently the behavior honors the request and creates the topic
Expand All @@ -176,4 +176,46 @@ public void testUpsertTopicsForNewTopicExistsInSR() throws StreamCreationExcepti
verifyStatic(AdminUtils.class, times(1));
AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$);
}

@Test(expected = StreamCreationException.class)
public void testUpsertTopicsForExistingStreamWithMismatchingConfigForceSyncDisabled() throws StreamCreationException {
// Mock it as an existing topic
when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true);

KafkaInfraManager kafkaManagerSpy = spy(kafkaManager);

// Existing Stream, but PROPS match!! should not have an exception
Properties prop = new Properties();
prop.put("k1", "v1");
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, prop, true, false);

// verify change topic DOES NOT HAPPEN because props match
verifyStatic(AdminUtils.class, times(0));
AdminUtils.changeTopicConfig(zkUtils, TOPIC, TOPIC_PROPS);

// verify create topic DOES NOT HAPPEN because props match
verifyStatic(AdminUtils.class, times(0));
AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, TOPIC_PROPS, RackAwareMode.Enforced$.MODULE$);
}

@Test
public void testUpsertTopicsForExistingStreamWithMismatchingConfigForceSyncEnabled() throws StreamCreationException {
// Mock it as an existing topic
when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true);

KafkaInfraManager kafkaManagerSpy = spy(kafkaManager);

// Existing Stream, but PROPS match!! should not have an exception
Properties prop = new Properties();
prop.put("MAX_MESSAGE_BYTES_CONFIG", "64000");
kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, prop, true, true);

// verify change topic DOES NOT HAPPEN because props match
verifyStatic(AdminUtils.class, times(0));
AdminUtils.changeTopicConfig(zkUtils, TOPIC, TOPIC_PROPS);

// verify create topic DOES NOT HAPPEN because props match
verifyStatic(AdminUtils.class, times(0));
AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, TOPIC_PROPS, RackAwareMode.Enforced$.MODULE$);
}
}
6 changes: 6 additions & 0 deletions schema-library-events/src/main/avro/stream.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ protocol StreamRegistryProtocol {
* Replication Factor
*/
int replicationFactor = 3;

/**
* Should the provided topic config be forcibly synced with the existing, underlying topic (if not the same)?
*/
boolean forceSync = false;

}

record AvroStreamKey {
Expand Down