-
-
Notifications
You must be signed in to change notification settings - Fork 511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kafka): additional advertised listeners/broker addresses for kafka #2920
base: main
Are you sure you want to change the base?
feat(kafka): additional advertised listeners/broker addresses for kafka #2920
Conversation
✅ Deploy Preview for testcontainers-go ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
28c77cc
to
1ff166c
Compare
1ff166c
to
8cffdbf
Compare
Thanks @ghthor for your work here, I saw you linked to #2894, which is providing support for dynamically adding listeners to kafka. IIUC this PR is adding fixed listeners to it. What are the implications of having one or the other worlds for the kafka module here? In other words: if we merge this one, do we need the other one? @eddumelendez could you take a look at this PR too? |
@mdelapenya I think that most of the usecases for dynamic listeners would be provided by this one. The case that wouldn't be covered by this PR is if you need listeners that are a different protocol other than There is no reason that adding more default fixed listeners could not co-exist with the feature to add dynamic listeners. I don't know that I've found a reason to use dynamic listeners yet, and they add many elements of complication, as to declare them you need to know much information upfront or configure more stuff to make them work (hostnames / ports) etc. The aim of this PR is to avoid as much boilerplate as possible to achieve Container <> Container support. I gave the user 2 options to achieve this aim.
You do either of those and you get Container <> Container kafka working AND we can assert all this is working within the kafka module tests because we're not dealing with user configurable listeners. |
A fan of this! Thanks for the explanation We'll start reviewing this PR today, and will let @eddumelendez take a look for more kafka-specific questions (I'm totally illiterate on it) |
|
8cffdbf
to
bb67aff
Compare
@mdelapenya can you authorize another run of the CI test suite? |
@ghthor could you run
|
bb67aff
to
7395ce5
Compare
@mdelapenya Done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, I've done an initial pass on this, theres a few bugs, suggestions and questions.
docs/modules/kafka.md
Outdated
string slice, containing the hostname `host.docker.internal` and a random port | ||
defined by Kafka's public port (`19092/tcp`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: clarify what the 19092 port is, it's not clear on reading if its an example of random port or is that re-enforcing the standard port
modules/kafka/examples_test.go
Outdated
@@ -19,7 +27,7 @@ func ExampleRun() { | |||
) | |||
defer func() { | |||
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil { | |||
log.Printf("failed to terminate container: %s", err) | |||
log.Fatalf("failed to terminate container: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: this breaks clean up, please revert.
modules/kafka/examples_test.go
Outdated
// Clean up the container after | ||
defer func() { | ||
if err := kafkaContainer.Terminate(ctx); err != nil { | ||
log.Fatalf("failed to terminate container: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: this should be Printf
modules/kafka/examples_test.go
Outdated
} | ||
}() | ||
|
||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: no need for wrapping braces which make this harder to read, more below.
docs/modules/kafka.md
Outdated
network. | ||
|
||
<!--codeinclude--> | ||
[Start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: looks like wrong name here says by docker network and then links to by container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct, It's a 2 part explanation, I was trying to hightlight that this works best (maybe ONLY works) if you start the Kafka testcontainer inside a non-default docker network. So I broke this into 2 codeincludes, one focusing on starting the Kafka test container in a non-default network and the second part of starting another container in that same network.
modules/kafka/kafka_test.go
Outdated
// Clean up the container after the test is complete | ||
t.Cleanup(func() { | ||
require.NoError(t, kafkaContainer.Terminate(ctx), "failed to terminate container: %v", err) | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: this isn't needed as that's what CleanContainer
does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent 🍾
modules/kafka/kafka_test.go
Outdated
require.Truef(t, strings.EqualFold(string(consumer.message.Value), "value"), "expected value to be %s, got %s", "value", string(consumer.message.Value)) | ||
require.Truef(t, strings.EqualFold(string(consumer.message.Value), value), "expected value to be %s, got %s", value, string(consumer.message.Value)) | ||
|
||
assertBrokers := func( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: move to separate method to avoid the function getting so large.
modules/kafka/kafka_test.go
Outdated
t.Cleanup(func() { | ||
require.NoError(t, kcat.Terminate(ctx), "failed to terminate container") | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: use CleanupContainer
before the error check.
assert := func(listener string) { | ||
t.Helper() | ||
require.Containsf(t, bs, listener, "expected advertised listeners to contain %s, got %s", listener, bs) | ||
} | ||
|
||
mustBrokers := func(fn func(context.Context) ([]string, error)) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: move to helper functions to avoid function bloat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Felt like this one benefited from being a internal closure, it's short, and requires to many parameters when lifted up to a function.
@eddumelendez what are your thoughts on this PR? |
bcffa98
to
7874ca9
Compare
7874ca9
to
2203abe
Compare
t.Run("BrokersByHostDockerInternal", func(t *testing.T) { | ||
brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx) | ||
require.NoError(t, err) | ||
|
||
kcat, err := runKcatContainer(ctx, brokers, func(hc *container.HostConfig) { | ||
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") | ||
}, nil) | ||
testcontainers.CleanupContainer(t, kcat) | ||
require.NoError(t, err) | ||
|
||
l, err := kcat.Logs(ctx) | ||
require.NoError(t, err) | ||
defer l.Close() | ||
|
||
assertKcatReadMsg(t, l) | ||
}) | ||
t.Run("BrokersByContainerId", func(t *testing.T) { | ||
brokers, err := kafkaContainer.BrokersByContainerId(ctx) | ||
require.NoError(t, err) | ||
|
||
kcat, err := runKcatContainer(ctx, brokers, nil, []string{net.Name}) | ||
testcontainers.CleanupContainer(t, kcat) | ||
require.NoError(t, err) | ||
|
||
l, err := kcat.Logs(ctx) | ||
require.NoError(t, err) | ||
defer l.Close() | ||
|
||
assertKcatReadMsg(t, l) | ||
}) | ||
t.Run("BrokersByContainerName", func(t *testing.T) { | ||
brokers, err := kafkaContainer.BrokersByContainerName(ctx) | ||
require.NoError(t, err) | ||
|
||
kcat, err := runKcatContainer(ctx, brokers, nil, []string{net.Name}) | ||
testcontainers.CleanupContainer(t, kcat) | ||
require.NoError(t, err) | ||
|
||
l, err := kcat.Logs(ctx) | ||
require.NoError(t, err) | ||
defer l.Close() | ||
|
||
assertKcatReadMsg(t, l) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenh Much happier with this after your suggestion to not use that giant closure here 🫡
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposed idea is good; however, I lean towards prioritizing user flexibility. Introducing these changes as defaults could potentially break existing users who have already defined their own listeners.
Additionally, there is a specific use case that this approach does not address: scenarios where tools like Toxiproxy are used in front of Kafka to simulate network conditions and test resiliency. This highlights the importance of maintaining configurability to accommodate diverse testing and deployment needs.
fmt.Sprintf("LOCALHOST://%s:%d", host, portLh.Int()), | ||
fmt.Sprintf("HOST_DOCKER_INTERNAL://%s:%d", "host.docker.internal", portDh.Int()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both have the same purpose. and LOCALHOST is more generic for CI and local usage. IMO, we just need to keep the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't use a broker address of localhost
from another docker container, that's the entire point of this proposed addition.
Do you have an example of how to do that, as this change is proposed because that's not really possible to do.
Do you have an example of what this looks like. I don't understand how adding a few more listeners and advertised listeners would break this. |
I could see what it looks like to put all this behind functional options. It would differ from the other PR in that we're not going to allow you to configure any of them, just whether they are enabled or not. This doesn't have a benefit that we could assert some things and produce errors if the API isn't used correctly. Example being, if you want to use the Using the |
I have reviewed various use cases and examined bugs reported in other language implementations. Below is a summary of the key requirements we should address in the API:
|
What does this PR do?
Why is it important?
localhost:<random port>
localhost
is not a valid address to kafka when running within another docker containerThis change is intended to support more complicated testing setups where the test code is running itself is another docker container, thus needs kafka to advertise a connection string other than
localhost
.Related issues
How to test this PR