diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go
index ea9d8638a..51f14a00b 100644
--- a/backend/runner/pubsub/consumer.go
+++ b/backend/runner/pubsub/consumer.go
@@ -126,6 +126,7 @@ func (c *consumer) subscribe(ctx context.Context) {
 	for {
 		select {
 		case <-ctx.Done():
+			c.group.Close()
 			return
 		default:
 		}
diff --git a/backend/runner/pubsub/integration_test.go b/backend/runner/pubsub/integration_test.go
index c628799df..20e3d23d7 100644
--- a/backend/runner/pubsub/integration_test.go
+++ b/backend/runner/pubsub/integration_test.go
@@ -3,16 +3,20 @@
 package pubsub
 
 import (
+	"os"
+	"path/filepath"
 	"strings"
 	"sync"
 	"testing"
 	"time"
 
 	"connectrpc.com/connect"
+	"github.com/IBM/sarama"
 	"github.com/alecthomas/assert/v2"
 	"github.com/alecthomas/types/optional"
 	timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1"
 	"github.com/block/ftl/common/slices"
+	"github.com/block/ftl/internal/exec"
 	in "github.com/block/ftl/internal/integration"
 	"github.com/block/ftl/internal/model"
 )
@@ -84,6 +88,68 @@ func TestExternalPublishRuntimeCheck(t *testing.T) {
 	)
 }
 
+// TestConsumerGroupMembership tests that when a runner ends, the consumer group is properly exited.
+func TestConsumerGroupMembership(t *testing.T) {
+	var deploymentKilledTime *time.Time
+	in.Run(t,
+		in.WithLanguages("go"),
+		in.WithPubSub(),
+		in.CopyModule("publisher"),
+		in.CopyModule("subscriber"),
+		in.Deploy("publisher"),
+		in.Deploy("subscriber"),
+
+		// consumer group must now have a member for each partition
+		checkGroupMembership("subscriber", "consumeSlow", 1),
+
+		// publish events that will take a long time to process on the first subscriber deployment
+		// to test that rebalancing doesnt cause consumption to fail and skip events
+		in.Repeat(100, in.Call("publisher", "publishSlow", in.Obj{}, func(t testing.TB, resp in.Obj) {})),
+
+		// Upgrade deployment
+		func(t testing.TB, ic in.TestContext) {
+			in.Infof("Modifying code")
+			path := filepath.Join(ic.WorkingDir(), "subscriber", "subscriber.go")
+
+			bytes, err := os.ReadFile(path)
+			assert.NoError(t, err)
+			output := strings.ReplaceAll(string(bytes), "This deployment is TheFirstDeployment", "This deployment is TheSecondDeployment")
+			assert.NoError(t, os.WriteFile(path, []byte(output), 0644))
+		},
+		in.Deploy("subscriber"),
+
+		// Currently old deployment runs for a little bit longer.
+		// During this time we expect the consumer group to have 2 members (old deployment and new deployment).
+		// This will probably change when we have proper draining of the old deployment.
+		checkGroupMembership("subscriber", "consumeSlow", 2),
+		func(t testing.TB, ic in.TestContext) {
+			in.Infof("Waiting for old deployment to be killed")
+			start := time.Now()
+			for {
+				assert.True(t, time.Since(start) < 15*time.Second)
+				ps, err := exec.Capture(ic.Context, ".", "ftl", "ps")
+				assert.NoError(t, err)
+				if strings.Count(string(ps), "dpl-subscriber-") == 1 {
+					// original deployment has ended
+					now := time.Now()
+					deploymentKilledTime = &now
+					return
+				}
+			}
+		},
+		// Once old deployment has ended, the consumer group should only have 1 member per partition (the new deployment)
+		// This should happen fairly quickly. If it takes a while it could be because the previous deployment did not close
+		// the group properly.
+		checkGroupMembership("subscriber", "consumeSlow", 1),
+		func(t testing.TB, ic in.TestContext) {
+			assert.True(t, time.Since(*deploymentKilledTime) < 3*time.Second, "make sure old deployment was removed from consumer group fast enough")
+		},
+
+		// confirm that each message was consumed successfully
+		checkConsumed("subscriber", "consumeSlow", true, 100, optional.None[string]()),
+	)
+}
+
 func publishToTestAndLocalTopics(calls int) in.Action {
 	// do this in parallel because we want to test race conditions
 	return func(t testing.TB, ic in.TestContext) {
@@ -161,3 +227,22 @@ func checkConsumed(module, verb string, success bool, count int, needle optional
 		}
 	}
 }
+func checkGroupMembership(module, subscription string, expectedCount int) in.Action {
+	return func(t testing.TB, ic in.TestContext) {
+		consumerGroup := module + "." + subscription
+		in.Infof("Checking group membership for %v", consumerGroup)
+
+		client, err := sarama.NewClient(in.RedPandaBrokers, sarama.NewConfig())
+		assert.NoError(t, err)
+		defer client.Close()
+
+		clusterAdmin, err := sarama.NewClusterAdminFromClient(client)
+		assert.NoError(t, err)
+		defer clusterAdmin.Close()
+
+		groups, err := clusterAdmin.DescribeConsumerGroups([]string{consumerGroup})
+		assert.NoError(t, err)
+		assert.Equal(t, len(groups), 1)
+		assert.Equal(t, len(groups[0].Members), expectedCount, "expected consumer group %v to have %v members", consumerGroup, expectedCount)
+	}
+}
diff --git a/backend/runner/pubsub/testdata/go/publisher/publisher.go b/backend/runner/pubsub/testdata/go/publisher/publisher.go
index 8ec3213b5..6316561f5 100644
--- a/backend/runner/pubsub/testdata/go/publisher/publisher.go
+++ b/backend/runner/pubsub/testdata/go/publisher/publisher.go
@@ -8,6 +8,11 @@ import (
 	// Import the FTL SDK.
 )
 
+type PubSubEvent struct {
+	Time     time.Time
+	Haystack string
+}
+
 type PartitionMapper struct{}
 
 var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{}
@@ -21,10 +26,8 @@ type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]
 
 type LocalTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]
 
-type PubSubEvent struct {
-	Time     time.Time
-	Haystack string
-}
+//ftl:export
+type SlowTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]
 
 //ftl:verb
 func PublishTen(ctx context.Context, topic TestTopic) error {
@@ -87,3 +90,13 @@ func Local(ctx context.Context, event PubSubEvent) error {
 	ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time)
 	return nil
 }
+
+//ftl:verb
+func PublishSlow(ctx context.Context, topic SlowTopic) error {
+	logger := ftl.LoggerFromContext(ctx)
+	t := time.Now()
+	logger.Infof("Publishing to slowTopic: %v", t)
+	return topic.Publish(ctx, PubSubEvent{
+		Time: t,
+	})
+}
diff --git a/backend/runner/pubsub/testdata/go/publisher/types.ftl.go b/backend/runner/pubsub/testdata/go/publisher/types.ftl.go
index d5ce050b7..e38f0d244 100644
--- a/backend/runner/pubsub/testdata/go/publisher/types.ftl.go
+++ b/backend/runner/pubsub/testdata/go/publisher/types.ftl.go
@@ -2,10 +2,10 @@
 package publisher
 
 import (
-	"context"
-	"github.com/block/ftl/common/reflection"
-	"github.com/block/ftl/go-runtime/ftl"
-	"github.com/block/ftl/go-runtime/server"
+    "context"
+    "github.com/block/ftl/common/reflection"
+    "github.com/block/ftl/go-runtime/ftl"
+    "github.com/block/ftl/go-runtime/server"
 )
 
 type LocalClient func(context.Context, PubSubEvent) error
@@ -14,6 +14,8 @@ type PublishOneClient func(context.Context) error
 
 type PublishOneToTopic2Client func(context.Context, PublishOneToTopic2Request) error
 
+type PublishSlowClient func(context.Context) error
+
 type PublishTenClient func(context.Context) error
 
 type PublishTenLocalClient func(context.Context) error
@@ -21,23 +23,27 @@ type PublishTenLocalClient func(context.Context) error
 func init() {
 	reflection.Register(
 		reflection.ProvideResourcesForVerb(
-			Local,
+            Local,
 		),
 		reflection.ProvideResourcesForVerb(
-			PublishOne,
+            PublishOne,
 			server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "testTopic"),
 		),
 		reflection.ProvideResourcesForVerb(
-			PublishOneToTopic2,
+            PublishOneToTopic2,
 			server.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]("publisher", "topic2"),
 		),
 		reflection.ProvideResourcesForVerb(
-			PublishTen,
+            PublishSlow,
+			server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "slowTopic"),
+		),
+		reflection.ProvideResourcesForVerb(
+            PublishTen,
 			server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "testTopic"),
 		),
 		reflection.ProvideResourcesForVerb(
-			PublishTenLocal,
+            PublishTenLocal,
 			server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "localTopic"),
 		),
 	)
-}
+}
\ No newline at end of file
diff --git a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go
index 208acf47d..8a6dbe5d6 100644
--- a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go
+++ b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go
@@ -3,6 +3,7 @@ package subscriber
 import (
 	"context"
 	"fmt"
+	"strings"
 	"time"
 
 	"ftl/publisher"
@@ -38,3 +39,16 @@ func PublishToExternalModule(ctx context.Context) error {
 	externalTopic := ftl.TopicHandle[publisher.PubSubEvent, ftl.SinglePartitionMap[publisher.PubSubEvent]]{Ref: reflection.Ref{Module: "publisher", Name: "testTopic"}.ToSchema()}
 	return externalTopic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
 }
+
+//ftl:verb
+//ftl:subscribe publisher.slowTopic from=beginning
+func ConsumeSlow(ctx context.Context, req publisher.PubSubEvent) error {
+	versionDescription := "This deployment is TheFirstDeployment"
+	if strings.Contains(versionDescription, "TheFirstDeployment") {
+		ftl.LoggerFromContext(ctx).Infof("ConsumeSlow first deployment (will sleep 5s): %v", req.Time)
+		time.Sleep(5 * time.Second)
+		return nil
+	}
+	ftl.LoggerFromContext(ctx).Infof("ConsumeSlow second deployment (immediate): %v", req.Time)
+	return nil
+}
diff --git a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go
index 47a3957bb..708227d14 100644
--- a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go
+++ b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go
@@ -13,6 +13,8 @@ type ConsumeButFailAndRetryClient func(context.Context, ftlpublisher.PubSubEvent
 
 type ConsumeFromLatestClient func(context.Context, ftlpublisher.PubSubEvent) error
 
+type ConsumeSlowClient func(context.Context, ftlpublisher.PubSubEvent) error
+
 type PublishToExternalModuleClient func(context.Context) error
 
 func init() {
@@ -26,6 +28,9 @@ func init() {
 		reflection.ProvideResourcesForVerb(
 			ConsumeFromLatest,
 		),
+		reflection.ProvideResourcesForVerb(
+			ConsumeSlow,
+		),
 		reflection.ProvideResourcesForVerb(
 			PublishToExternalModule,
 		),
diff --git a/internal/integration/harness.go b/internal/integration/harness.go
index c123de52c..327f9c89d 100644
--- a/internal/integration/harness.go
+++ b/internal/integration/harness.go
@@ -45,7 +45,7 @@ import (
 
 const dumpPath = "/tmp/ftl-kube-report"
 
-var redPandaBrokers = []string{"127.0.0.1:19092"}
+var RedPandaBrokers = []string{"127.0.0.1:19092"}
 
 func (i TestContext) integrationTestTimeout() time.Duration {
 	timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s")
@@ -415,7 +415,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
 				err = exec.CommandWithEnv(ctx, log.Debug, rootDir, envars, "docker", "compose", "-f", "internal/dev/docker-compose.redpanda.yml", "-p", "ftl", "up", "-d", "--wait").RunBuffered(ctx)
 				assert.NoError(t, err)
 
-				client, err := sarama.NewClient(redPandaBrokers, sarama.NewConfig())
+				client, err := sarama.NewClient(RedPandaBrokers, sarama.NewConfig())
 				assert.NoError(t, err)
 				defer client.Close()