Skip to content

Commit

Permalink
r/aws_msk_cluster: fix provisioned_throughput block update behavior (
Browse files Browse the repository at this point in the history
…#40910)

This change includes two fixes for the `broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput` block:

1. The update method now includes logic to handle the case where the block previously existed but has been removed. Removal will cause the nested `enabled` value to be set to `false` in the call to the Update API, disabling provisioned throughput.
2. Differences between an unset `provisioned_throughput` block and the default value returned from AWS (a nested block with `enabled` set to `false`) will now be suppressed. This should prevent perpetual differences in configurations which choose to omit this block to disable it.

```console
% make testacc PKG=kafka TESTS=TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_
make: Verifying source code with gofmt...
==> Checking that code complies with gofmt requirements...
TF_ACC=1 go1.23.3 test ./internal/service/kafka/... -v -count 1 -parallel 20 -run='TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_'  -timeout 360m
2025/01/13 14:30:06 Initializing Terraform AWS Provider...

--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToUnset (2195.72s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToDisabled (2200.75s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_disabledToEnabled (2207.31s)
PASS                                                                                                                                                                                    ok      github.com/hashicorp/terraform-provider-aws/internal/service/kafka      2213.740s
```

```console
% make testacc PKG=kafka TESTS=TestAccKafkaCluster_
make: Verifying source code with gofmt...
==> Checking that code complies with gofmt requirements...
TF_ACC=1 go1.23.3 test ./internal/service/kafka/... -v -count 1 -parallel 20 -run='TestAccKafkaCluster_'  -timeout 360m -vet=off
2025/01/13 15:31:15 Initializing Terraform AWS Provider...

--- PASS: TestAccKafkaCluster_EncryptionInfoEncryptionInTransit_inCluster (1903.67s)
=== CONT  TestAccKafkaCluster_EncryptionInfoEncryptionInTransit_clientBroker
--- PASS: TestAccKafkaCluster_basic (1918.03s)
=== CONT  TestAccKafkaCluster_EncryptionInfo_encryptionAtRestKMSKeyARN
--- PASS: TestAccKafkaCluster_enhancedMonitoring (2052.26s)
=== CONT  TestAccKafkaCluster_tags
--- PASS: TestAccKafkaCluster_ClientAuthenticationTLS_certificateAuthorityARNs (2115.51s)
=== CONT  TestAccKafkaCluster_disappears
--- PASS: TestAccKafkaCluster_openMonitoring (2158.66s)
=== CONT  TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_disabledToEnabled
--- PASS: TestAccKafkaCluster_loggingInfo (2195.49s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToDisabled (2215.01s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToUnset (2226.82s)
--- PASS: TestAccKafkaCluster_storageMode (2819.54s)
--- PASS: TestAccKafkaCluster_numberOfBrokerNodes (2823.48s)
--- PASS: TestAccKafkaCluster_Info_revision (2827.43s)
--- PASS: TestAccKafkaCluster_EncryptionInfo_encryptionAtRestKMSKeyARN (2089.17s)
--- PASS: TestAccKafkaCluster_EncryptionInfoEncryptionInTransit_clientBroker (2105.36s)
--- PASS: TestAccKafkaCluster_kafkaVersionDowngrade (4013.28s)
--- PASS: TestAccKafkaCluster_ClientAuthenticationSASL_iam (4014.32s)
--- PASS: TestAccKafkaCluster_tags (1962.95s)
--- PASS: TestAccKafkaCluster_ClientAuthenticationSASL_scram (4018.83s)
--- PASS: TestAccKafkaCluster_disappears (1907.78s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_disabledToEnabled (1878.63s)
--- PASS: TestAccKafkaCluster_ClientAuthenticationTLS_initiallyNoAuthentication (4289.27s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_vpcConnectivity (5800.89s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_publicAccessSASLIAM (5961.32s)
--- PASS: TestAccKafkaCluster_kafkaVersionUpgradeWithInfo (7131.54s)
--- PASS: TestAccKafkaCluster_kafkaVersionUpgrade (7131.72s)
--- PASS: TestAccKafkaCluster_BrokerNodeGroupInfo_instanceType (7585.60s)
PASS
ok      github.com/hashicorp/terraform-provider-aws/internal/service/kafka      7590.972s
```
  • Loading branch information
jar-b authored Jan 15, 2025
1 parent cec320b commit 4066211
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .changelog/40910.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
```release-note:bug
resource/aws_msk_cluster: Prevent persistent differences when `broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput` is unset
```
```release-note:bug
resource/aws_msk_cluster: Properly disable provisioned throughput when a previously configured `broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput` block is removed
```
10 changes: 7 additions & 3 deletions internal/service/kafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ func resourceCluster() *schema.Resource {
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"provisioned_throughput": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// This feature is available for
Expand Down Expand Up @@ -788,6 +789,9 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int

if v, ok := d.GetOk("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.TargetBrokerEBSVolumeInfo[0].ProvisionedThroughput = expandProvisionedThroughput(v.([]interface{})[0].(map[string]interface{}))
} else if o, n := d.GetChange("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); len(o.([]interface{})) > 0 && len(n.([]interface{})) == 0 {
// Disable when a previously configured provisioned_throughput block is removed
input.TargetBrokerEBSVolumeInfo[0].ProvisionedThroughput = &types.ProvisionedThroughput{Enabled: aws.Bool(false)}
}

output, err := conn.UpdateBrokerStorage(ctx, input)
Expand Down
138 changes: 131 additions & 7 deletions internal/service/kafka/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ func TestAccKafkaCluster_tags(t *testing.T) {
})
}

func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) {
func TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_disabledToEnabled(t *testing.T) {
ctx := acctest.Context(t)
var cluster1, cluster2 types.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster.test"
original_volume_size := 11
updated_volume_size := 112
volumeSizeOriginal := 11
volumeSizeUpdated := 112

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) },
Expand All @@ -219,13 +219,13 @@ func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) {
CheckDestroy: testAccCheckClusterDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputNotEnabled(rName, original_volume_size, "kafka.m5.4xlarge"),
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputNotEnabled(rName, volumeSizeOriginal, "kafka.m5.4xlarge"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(ctx, resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(original_volume_size)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSizeOriginal)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtFalse),
),
Expand All @@ -240,14 +240,14 @@ func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) {
},
{
// update broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, updated_volume_size, "kafka.m5.4xlarge"),
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, volumeSizeUpdated, "kafka.m5.4xlarge"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(ctx, resourceName, &cluster2),
testAccCheckClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(updated_volume_size)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSizeUpdated)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtTrue),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.volume_throughput", "250"),
Expand All @@ -257,6 +257,109 @@ func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) {
})
}

func TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToDisabled(t *testing.T) {
ctx := acctest.Context(t)
var cluster1, cluster2 types.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster.test"
volumeSize := 20

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.KafkaServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, volumeSize, "kafka.m5.4xlarge"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(ctx, resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtTrue),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.volume_throughput", "250"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"current_version",
},
},
{
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputNotEnabled(rName, volumeSize, "kafka.m5.4xlarge"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(ctx, resourceName, &cluster2),
testAccCheckClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtFalse),
),
},
},
})
}

// Ref: https://github.com/hashicorp/terraform-provider-aws/issues/26031
func TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToUnset(t *testing.T) {
ctx := acctest.Context(t)
var cluster1, cluster2 types.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster.test"
volumeSize := 20

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.KafkaServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, volumeSize, "kafka.m5.4xlarge"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(ctx, resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtTrue),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.volume_throughput", "250"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"current_version",
},
},
{
Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputUnset(rName, volumeSize, "kafka.m5.4xlarge"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(ctx, resourceName, &cluster2),
testAccCheckClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtFalse),
),
},
},
})
}

func TestAccKafkaCluster_BrokerNodeGroupInfo_instanceType(t *testing.T) {
ctx := acctest.Context(t)
var cluster1, cluster2 types.ClusterInfo
Expand Down Expand Up @@ -1440,6 +1543,27 @@ resource "aws_msk_cluster" "test" {
`, rName, ebsVolumeSize, instanceType))
}

func testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputUnset(rName string, ebsVolumeSize int, instanceType string) string {
return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.8.1"
number_of_broker_nodes = 3
broker_node_group_info {
client_subnets = aws_subnet.test[*].id
instance_type = %[3]q
security_groups = [aws_security_group.test.id]
storage_info {
ebs_storage_info {
volume_size = %[2]d
}
}
}
}
`, rName, ebsVolumeSize, instanceType))
}

func testAccClusterConfig_brokerNodeGroupInfoInstanceType(rName string, t string) string {
return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
Expand Down

0 comments on commit 4066211

Please sign in to comment.