Skip to content

Commit

Permalink
Resolve review comment
Browse files Browse the repository at this point in the history
Signed-off-by: chandankumar4 <[email protected]>
  • Loading branch information
chandankumar4 committed Dec 13, 2023
1 parent 498f8b5 commit 6eaa6c7
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 110 deletions.
31 changes: 30 additions & 1 deletion test/e2e-api/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@ func init() {
var brokers = []string{bootstrapServers}
http.HandleFunc("/kafka/create-topic", func(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
partitions, replicas, err := getKafkaPartitionAndReplicas(r.URL.Query().Get("partitions"), r.URL.Query().Get("replicas"))
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

admin, err := sarama.NewClusterAdmin(brokers, sarama.NewConfig())
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer admin.Close()
if err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, true); err != nil {
if err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: int32(partitions), ReplicationFactor: int16(replicas)}, true); err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -202,3 +209,25 @@ func init() {
_, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", n, mf.size, float64(n)/time.Since(start).Seconds(), topic)
})
}

func getKafkaPartitionAndReplicas(p, r string) (partition int, replicas int, err error) {
if p == "" {
partition = 1
} else {
partition, err = strconv.Atoi(p)
if err != nil {
return 0, 0, err
}
}

if r == "" {
replicas = 1
} else {
replicas, err = strconv.Atoi(r)
if err != nil {
return 0, 0, err
}
}

return partition, replicas, nil
}
4 changes: 2 additions & 2 deletions test/fixtures/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"time"
)

func CreateKafkaTopic() string {
func CreateKafkaTopic(partition, replicas string) string {
topic := fmt.Sprintf("e2e-topic-%s", rand.String(5))
log.Printf("create Kafka topic %q\n", topic)
InvokeE2EAPI("/kafka/create-topic?topic=%s", topic)
InvokeE2EAPI("/kafka/create-topic?topic=%s&partition=%s&replicas=%s", topic, partition, replicas)
return topic
}

Expand Down
12 changes: 6 additions & 6 deletions test/idle-source-e2e/idle_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Once "threshold" reached to 5s(configurable) and if source is found as idle, the
*/

//go:generate kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true
//go:generate kubectl apply -k ./kafka -n numaflow-system
//go:generate kubectl apply -k ../../config/apps/kafka -n numaflow-system
// Wait for zookeeper to come up
//go:generate sleep 60

Expand Down Expand Up @@ -69,6 +69,7 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithHttpSource() {
return
default:
eventTime := strconv.Itoa(startTime + i*1000)
// SendMessageTo will publish the message to only one replica in case of multiple replicas.
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime))
Expand All @@ -89,17 +90,17 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithKafkaSource() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

inputTopic := "input-topic"
topic := CreateKafkaTopic("2", "2")
fileData, err := os.ReadFile("testdata/kafka-pipeline.yaml")
is.NoError(err)
updatedFileData := strings.ReplaceAll(string(fileData), "my-topic", inputTopic)
updatedFileData := strings.ReplaceAll(string(fileData), "my-topic", topic)

w := is.Given().Pipeline(updatedFileData).When().CreatePipelineAndWait()
// wait for all the pods to come up
w.Expect().VertexPodsRunning()

defer w.DeletePipelineAndWait()
defer DeleteKafkaTopic(inputTopic)
defer DeleteKafkaTopic(topic)

done := make(chan struct{})
go func() {
Expand All @@ -111,14 +112,13 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithKafkaSource() {
case <-done:
return
default:
SendMessage(inputTopic, "data", generateMsg("1", startTime))
SendMessage(topic, "data", generateMsg("1", startTime))
time.Sleep(10 * time.Millisecond)
startTime = startTime.Add(1 * time.Second)
}
}
}()

ExpectKafkaTopicCount(inputTopic, 15, 3*time.Second)
// since the window duration is 10 second, so the count of event will be 10
w.Expect().SinkContains("sink", "10", WithTimeout(120*time.Second))
done <- struct{}{}
Expand Down
92 changes: 0 additions & 92 deletions test/idle-source-e2e/kafka/kafka-minimal.yaml

This file was deleted.

6 changes: 0 additions & 6 deletions test/idle-source-e2e/kafka/kustomization.yaml

This file was deleted.

6 changes: 3 additions & 3 deletions test/kafka-e2e/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type KafkaSuite struct {
}

func (ks *KafkaSuite) TestKafkaSink() {
outputTopic := fixtures.CreateKafkaTopic()
outputTopic := fixtures.CreateKafkaTopic("1", "1")
defer fixtures.DeleteKafkaTopic(outputTopic)
pipeline := &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -93,8 +93,8 @@ func (ks *KafkaSuite) TestKafkaSink() {
}

func (ks *KafkaSuite) TestKafkaSourceSink() {
inputTopic := fixtures.CreateKafkaTopic()
outputTopic := fixtures.CreateKafkaTopic()
inputTopic := fixtures.CreateKafkaTopic("1", "1")
outputTopic := fixtures.CreateKafkaTopic("1", "1")
pipeline := &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka-sink-e2e",
Expand Down

0 comments on commit 6eaa6c7

Please sign in to comment.