diff --git a/kafka-scheduler/src/main/java/org/apache/mesos/kafka/offer/PersistentOfferRequirementProvider.java b/kafka-scheduler/src/main/java/org/apache/mesos/kafka/offer/PersistentOfferRequirementProvider.java index 07b05c6f..d5b1f5ce 100644 --- a/kafka-scheduler/src/main/java/org/apache/mesos/kafka/offer/PersistentOfferRequirementProvider.java +++ b/kafka-scheduler/src/main/java/org/apache/mesos/kafka/offer/PersistentOfferRequirementProvider.java @@ -79,7 +79,12 @@ public OfferRequirement getUpdateOfferRequirement(String configName, TaskInfo ta String brokerName = taskInfo.getName(); Integer brokerId = OfferUtils.nameToId(brokerName); String taskId = taskInfo.getTaskId().getValue(); + String persistenceId = OfferUtils.getPersistenceId(taskInfo); + if (persistenceId == null) { + persistenceId = UUID.randomUUID().toString(); + } + TaskInfo newTaskInfo = getTaskInfo(configName, config, persistenceId, brokerId, taskId); log.info("newTaskInfo: " + newTaskInfo);