diff --git a/.changelog/40910.txt b/.changelog/40910.txt new file mode 100644 index 00000000000..4115bc07cfc --- /dev/null +++ b/.changelog/40910.txt @@ -0,0 +1,6 @@ +```release-note:bug +resource/aws_msk_cluster: Prevent persistent differences when `broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput` is unset +``` +```release-note:bug +resource/aws_msk_cluster: Properly disable provisioned throughput when a previously configured `broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput` block is removed +``` diff --git a/internal/service/kafka/cluster.go b/internal/service/kafka/cluster.go index 8a5c563c8cf..766209e6b1b 100644 --- a/internal/service/kafka/cluster.go +++ b/internal/service/kafka/cluster.go @@ -221,9 +221,10 @@ func resourceCluster() *schema.Resource { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "provisioned_throughput": { - Type: schema.TypeList, - Optional: true, - MaxItems: 1, + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ // This feature is available for @@ -788,6 +789,9 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int if v, ok := d.GetOk("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.TargetBrokerEBSVolumeInfo[0].ProvisionedThroughput = expandProvisionedThroughput(v.([]interface{})[0].(map[string]interface{})) + } else if o, n := d.GetChange("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); len(o.([]interface{})) > 0 && len(n.([]interface{})) == 0 { + // Disable when a previously configured provisioned_throughput block is removed + input.TargetBrokerEBSVolumeInfo[0].ProvisionedThroughput = &types.ProvisionedThroughput{Enabled: aws.Bool(false)} } output, err := conn.UpdateBrokerStorage(ctx, input) diff --git a/internal/service/kafka/cluster_test.go b/internal/service/kafka/cluster_test.go index 2fe4bd7fd81..59aa874d024 100644 --- a/internal/service/kafka/cluster_test.go +++ b/internal/service/kafka/cluster_test.go @@ -204,13 +204,13 @@ func TestAccKafkaCluster_tags(t *testing.T) { }) } -func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) { +func TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_disabledToEnabled(t *testing.T) { ctx := acctest.Context(t) var cluster1, cluster2 types.ClusterInfo rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) resourceName := "aws_msk_cluster.test" - original_volume_size := 11 - updated_volume_size := 112 + volumeSizeOriginal := 11 + volumeSizeUpdated := 112 resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) }, @@ -219,13 +219,13 @@ func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) { CheckDestroy: testAccCheckClusterDestroy(ctx), Steps: []resource.TestStep{ { - Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputNotEnabled(rName, original_volume_size, "kafka.m5.4xlarge"), + Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputNotEnabled(rName, volumeSizeOriginal, "kafka.m5.4xlarge"), Check: resource.ComposeAggregateTestCheckFunc( testAccCheckClusterExists(ctx, resourceName, &cluster1), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"), - resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(original_volume_size)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSizeOriginal)), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtFalse), ), @@ -240,14 +240,14 @@ func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) { }, { // update broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size - Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, updated_volume_size, "kafka.m5.4xlarge"), + Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, volumeSizeUpdated, "kafka.m5.4xlarge"), Check: resource.ComposeAggregateTestCheckFunc( testAccCheckClusterExists(ctx, resourceName, &cluster2), testAccCheckClusterNotRecreated(&cluster1, &cluster2), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"), - resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(updated_volume_size)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSizeUpdated)), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtTrue), resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.volume_throughput", "250"), @@ -257,6 +257,109 @@ func TestAccKafkaCluster_BrokerNodeGroupInfo_storageInfo(t *testing.T) { }) } +func TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToDisabled(t *testing.T) { + ctx := acctest.Context(t) + var cluster1, cluster2 types.ClusterInfo + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_cluster.test" + volumeSize := 20 + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, names.KafkaServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckClusterDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, volumeSize, "kafka.m5.4xlarge"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(ctx, resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtTrue), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.volume_throughput", "250"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "current_version", + }, + }, + { + Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputNotEnabled(rName, volumeSize, "kafka.m5.4xlarge"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(ctx, resourceName, &cluster2), + testAccCheckClusterNotRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtFalse), + ), + }, + }, + }) +} + +// Ref: https://github.com/hashicorp/terraform-provider-aws/issues/26031 +func TestAccKafkaCluster_BrokerNodeGroupInfo_StorageInfo_enabledToUnset(t *testing.T) { + ctx := acctest.Context(t) + var cluster1, cluster2 types.ClusterInfo + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_cluster.test" + volumeSize := 20 + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, names.KafkaServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckClusterDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputEnabled(rName, volumeSize, "kafka.m5.4xlarge"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(ctx, resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtTrue), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.volume_throughput", "250"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "current_version", + }, + }, + { + Config: testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputUnset(rName, volumeSize, "kafka.m5.4xlarge"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(ctx, resourceName, &cluster2), + testAccCheckClusterNotRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size", strconv.Itoa(volumeSize)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.#", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput.0.enabled", acctest.CtFalse), + ), + }, + }, + }) +} + func TestAccKafkaCluster_BrokerNodeGroupInfo_instanceType(t *testing.T) { ctx := acctest.Context(t) var cluster1, cluster2 types.ClusterInfo @@ -1440,6 +1543,27 @@ resource "aws_msk_cluster" "test" { `, rName, ebsVolumeSize, instanceType)) } +func testAccClusterConfig_brokerNodeGroupInfoStorageInfoVolumeSizeSetAndProvThroughputUnset(rName string, ebsVolumeSize int, instanceType string) string { + return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(` +resource "aws_msk_cluster" "test" { + cluster_name = %[1]q + kafka_version = "2.8.1" + number_of_broker_nodes = 3 + + broker_node_group_info { + client_subnets = aws_subnet.test[*].id + instance_type = %[3]q + security_groups = [aws_security_group.test.id] + storage_info { + ebs_storage_info { + volume_size = %[2]d + } + } + } +} +`, rName, ebsVolumeSize, instanceType)) +} + func testAccClusterConfig_brokerNodeGroupInfoInstanceType(rName string, t string) string { return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(` resource "aws_msk_cluster" "test" {