From 096ebcae90235ae8d91ce03e83643c6a24f85d8a Mon Sep 17 00:00:00 2001 From: Eugene R Date: Tue, 14 May 2024 11:45:23 +0300 Subject: [PATCH] chore(examples): update docker compose files (#133) --- examples/kafka/docker-compose.yml | 42 +++++++++++++++++++++--------- examples/kafka/main.go | 6 +++-- examples/pulsar/docker-compose.yml | 22 ++++++++++++---- examples/redis/docker-compose.yml | 2 -- examples/redis/pubsub/main.go | 2 +- examples/redis/stream/main.go | 4 +-- 6 files changed, 54 insertions(+), 24 deletions(-) diff --git a/examples/kafka/docker-compose.yml b/examples/kafka/docker-compose.yml index c227f18..63d8429 100644 --- a/examples/kafka/docker-compose.yml +++ b/examples/kafka/docker-compose.yml @@ -1,20 +1,38 @@ -version: '3' - -# docker exec --interactive --tty kafka-kafka-1 /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test -# docker exec --interactive --tty kafka-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning +# docker exec --interactive --tty broker /bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test +# docker exec --interactive --tty broker /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test2 --from-beginning +# https://github.com/confluentinc/cp-all-in-one/blob/7.6.1-post/cp-all-in-one-community/docker-compose.yml +# https://docs.confluent.io/platform/current/installation/versions-interoperability.html services: zookeeper: - image: wurstmeister/zookeeper + image: confluentinc/cp-zookeeper:7.6.1 + hostname: zookeeper + container_name: zookeeper ports: - "2181:2181" - kafka: - image: wurstmeister/kafka:2.13-2.8.1 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.6.1 + hostname: broker + container_name: broker + depends_on: + - zookeeper ports: + - "29092:29092" - "9092:9092" + - "9101:9101" environment: - KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 - KAFKA_CREATE_TOPICS: "test:3:1,test2:3:1" - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - volumes: - - /var/run/docker.sock:/var/run/docker.sock + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + AUTO_CREATE_TOPICS: true diff --git a/examples/kafka/main.go b/examples/kafka/main.go index bd51b2f..6e87bcc 100644 --- a/examples/kafka/main.go +++ b/examples/kafka/main.go @@ -12,13 +12,15 @@ import ( ) func main() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + hosts := []string{"127.0.0.1:9092"} - ctx := context.Background() config := sarama.NewConfig() config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin() config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Producer.Return.Successes = true - config.Version, _ = sarama.ParseKafkaVersion("2.8.1") + config.Version, _ = sarama.ParseKafkaVersion("3.6.2") groupID := "testConsumer" source, err := ext.NewKafkaSource(ctx, hosts, groupID, config, "test") diff --git a/examples/pulsar/docker-compose.yml b/examples/pulsar/docker-compose.yml index 2641a62..88bb0d3 100644 --- a/examples/pulsar/docker-compose.yml +++ b/examples/pulsar/docker-compose.yml @@ -1,24 +1,36 @@ -version: '3' +# docker exec --interactive --tty pulsar-standalone /pulsar/bin/pulsar-client produce test1 -m "test" -n 10 +# docker exec --interactive --tty pulsar-standalone /pulsar/bin/pulsar-client consume -s sub test2 -n 0 services: standalone: - image: apachepulsar/pulsar + image: apachepulsar/pulsar:3.2.2 + container_name: pulsar-standalone ports: - "6650:6650" expose: - 8080 - 6650 environment: - - PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" + - BOOKIE_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" command: > /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone" + networks: + - pulsar-network + dashboard: - image: apachepulsar/pulsar-dashboard + image: apachepulsar/pulsar-dashboard:2.8.1 + container_name: pulsar-dashboard depends_on: - standalone ports: - "80:80" environment: - - SERVICE_URL=http://standalone:8080 \ No newline at end of file + - SERVICE_URL=http://standalone:8080 + networks: + - pulsar-network + +networks: + pulsar-network: + driver: bridge diff --git a/examples/redis/docker-compose.yml b/examples/redis/docker-compose.yml index 1554914..91c13f7 100644 --- a/examples/redis/docker-compose.yml +++ b/examples/redis/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3' - services: redis: image: redis diff --git a/examples/redis/pubsub/main.go b/examples/redis/pubsub/main.go index 605f0c4..3ed3871 100644 --- a/examples/redis/pubsub/main.go +++ b/examples/redis/pubsub/main.go @@ -11,7 +11,6 @@ import ( ext "github.com/reugn/go-streams/redis" ) -// docker exec -it pubsub bash // https://redis.io/topics/pubsub func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) @@ -39,5 +38,6 @@ func main() { } var toUpper = func(msg *redis.Message) string { + log.Printf("Got: %s", msg) return strings.ToUpper(msg.Payload) } diff --git a/examples/redis/stream/main.go b/examples/redis/stream/main.go index 342bb79..dc5cc67 100644 --- a/examples/redis/stream/main.go +++ b/examples/redis/stream/main.go @@ -17,7 +17,7 @@ import ( // XREAD COUNT 1 BLOCK 100 STREAMS stream2 0 func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - cancel() + defer cancel() config := &redis.Options{ Addr: "localhost:6379", // use default Addr @@ -52,7 +52,7 @@ func main() { } var toUpper = func(msg *redis.XMessage) *redis.XMessage { - fmt.Printf("Got: %v\n", msg.Values) + log.Printf("Got: %v", msg.Values) values := make(map[string]any, len(msg.Values)) for key, element := range msg.Values { values[key] = strings.ToUpper(fmt.Sprintf("%v", element))