Skip to content

Commit

Permalink
Revert "EnhancedMonitoring and FailedRecordCount outputs should be nu…
Browse files Browse the repository at this point in the history
…llable (#418)"

This reverts commit ba6ee35.
  • Loading branch information
etspaceman committed Feb 8, 2023
1 parent df35b44 commit d637349
Show file tree
Hide file tree
Showing 20 changed files with 96 additions and 99 deletions.
8 changes: 8 additions & 0 deletions src/fun/scala/kinesis/mock/DescribeStreamSummaryTests.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kinesis.mock

import java.util.Collections

import cats.syntax.all._
import software.amazon.awssdk.services.kinesis.model._

Expand All @@ -12,6 +14,12 @@ class DescribeStreamSummaryTests extends AwsFunctionalTests {
.builder()
.consumerCount(0)
.encryptionType(EncryptionType.NONE)
.enhancedMonitoring(
EnhancedMetrics
.builder()
.shardLevelMetricsWithStrings(Collections.emptyList[String]())
.build()
)
.openShardCount(genStreamShardCount)
.retentionPeriodHours(24)
.streamARN(
Expand Down
8 changes: 8 additions & 0 deletions src/fun/scala/kinesis/mock/DescribeStreamTests.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kinesis.mock

import java.util.Collections

import software.amazon.awssdk.services.kinesis.model._

import kinesis.mock.syntax.javaFuture._
Expand All @@ -19,6 +21,12 @@ class DescribeStreamTests extends AwsFunctionalTests {
expected = StreamDescription
.builder()
.encryptionType(EncryptionType.NONE)
.enhancedMonitoring(
EnhancedMetrics
.builder()
.shardLevelMetricsWithStrings(Collections.emptyList[String]())
.build()
)
.hasMoreShards(false)
.shards(res.streamDescription().shards())
.retentionPeriodHours(24)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/kinesis/mock/api/DeleteStreamRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ final case class DeleteStreamRequest(
shards = SortedMap.empty,
streamStatus = StreamStatus.DELETING,
tags = Tags.empty,
enhancedMonitoring = None,
enhancedMonitoring = Vector.empty,
consumers = SortedMap.empty
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ final case class DisableEnhancedMonitoringRequest(
.flatMap(_ => CommonValidations.findStream(arn, streams))
.map { stream =>
val current =
stream.enhancedMonitoring.map(_.flatMap(_.shardLevelMetrics))
val desired: Option[Vector[ShardLevelMetric]] =
stream.enhancedMonitoring.flatMap(_.shardLevelMetrics)
val desired =
if (shardLevelMetrics.contains(ShardLevelMetric.ALL))
None
else current.map(_.diff(shardLevelMetrics))
Vector.empty
else current.diff(shardLevelMetrics)

(
streams.updateStream(
stream
.copy(enhancedMonitoring =
desired.map(x => Vector(ShardLevelMetrics(x)))
Vector(ShardLevelMetrics(desired))
)
),
DisableEnhancedMonitoringResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import io.circe
import kinesis.mock.models._

final case class DisableEnhancedMonitoringResponse(
currentShardLevelMetrics: Option[Vector[ShardLevelMetric]],
desiredShardLevelMetrics: Option[Vector[ShardLevelMetric]],
currentShardLevelMetrics: Vector[ShardLevelMetric],
desiredShardLevelMetrics: Vector[ShardLevelMetric],
streamName: StreamName,
streamArn: StreamArn
)
Expand All @@ -35,10 +35,10 @@ object DisableEnhancedMonitoringResponse {
for {
currentShardLevelMetrics <- x
.downField("CurrentShardLevelMetrics")
.as[Option[Vector[ShardLevelMetric]]]
.as[Vector[ShardLevelMetric]]
desiredShardLevelMetrics <- x
.downField("DesiredShardLevelMetrics")
.as[Option[Vector[ShardLevelMetric]]]
.as[Vector[ShardLevelMetric]]
streamName <- x.downField("StreamName").as[StreamName]
streamArn <- x.downField("StreamARN").as[StreamArn]
} yield DisableEnhancedMonitoringResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,18 @@ final case class EnableEnhancedMonitoringRequest(
.flatMap(_ => CommonValidations.findStream(arn, streams))
.map { stream =>
val current =
stream.enhancedMonitoring.map(_.flatMap(_.shardLevelMetrics))
stream.enhancedMonitoring.flatMap(_.shardLevelMetrics)
val desired =
if (shardLevelMetrics.contains(ShardLevelMetric.ALL))
Some(
ShardLevelMetric.values
.filterNot(_ == ShardLevelMetric.ALL)
.toVector
)
else
(current
.map(x => (x ++ shardLevelMetrics).distinct)
.orElse(Some(shardLevelMetrics)))
ShardLevelMetric.values
.filterNot(_ == ShardLevelMetric.ALL)
.toVector
else (current ++ shardLevelMetrics).distinct
(
streams.updateStream(
stream
.copy(enhancedMonitoring =
desired.map(x => Vector(ShardLevelMetrics(x)))
Vector(ShardLevelMetrics(desired))
)
),
EnableEnhancedMonitoringResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import io.circe
import kinesis.mock.models._

final case class EnableEnhancedMonitoringResponse(
currentShardLevelMetrics: Option[Vector[ShardLevelMetric]],
desiredShardLevelMetrics: Option[Vector[ShardLevelMetric]],
currentShardLevelMetrics: Vector[ShardLevelMetric],
desiredShardLevelMetrics: Vector[ShardLevelMetric],
streamName: StreamName,
streamArn: StreamArn
)
Expand All @@ -35,10 +35,10 @@ object EnableEnhancedMonitoringResponse {
for {
currentShardLevelMetrics <- x
.downField("CurrentShardLevelMetrics")
.as[Option[Vector[ShardLevelMetric]]]
.as[Vector[ShardLevelMetric]]
desiredShardLevelMetrics <- x
.downField("DesiredShardLevelMetrics")
.as[Option[Vector[ShardLevelMetric]]]
.as[Vector[ShardLevelMetric]]
streamName <- x.downField("StreamName").as[StreamName]
streamArn <- x.downField("StreamARN").as[StreamArn]
} yield EnableEnhancedMonitoringResponse(
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/kinesis/mock/api/PutRecordsRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ final case class PutRecordsRequest(
),
PutRecordsResponse(
stream.encryptionType,
None,
0,
asRecords.map { case (_, _, _, entry) =>
entry
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/kinesis/mock/api/PutRecordsResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import kinesis.mock.models.EncryptionType

final case class PutRecordsResponse(
encryptionType: EncryptionType,
failedRecordCount: Option[Int],
failedRecordCount: Int,
records: Vector[PutRecordsResultEntry]
)

Expand All @@ -26,7 +26,7 @@ object PutRecordsResponse {
x =>
for {
encryptionType <- x.downField("EncryptionType").as[EncryptionType]
failedRecordCount <- x.downField("FailedRecordCount").as[Option[Int]]
failedRecordCount <- x.downField("FailedRecordCount").as[Int]
records <- x.downField("Records").as[Vector[PutRecordsResultEntry]]
} yield PutRecordsResponse(encryptionType, failedRecordCount, records)

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/kinesis/mock/models/StreamData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import kinesis.mock.instances.circe._
final case class StreamData(
consumers: SortedMap[ConsumerName, Consumer],
encryptionType: EncryptionType,
enhancedMonitoring: Option[Vector[ShardLevelMetrics]],
enhancedMonitoring: Vector[ShardLevelMetrics],
keyId: Option[String],
retentionPeriod: FiniteDuration,
shards: SortedMap[Shard, Vector[KinesisRecord]],
Expand Down Expand Up @@ -81,7 +81,7 @@ object StreamData {
encryptionType <- x.downField("encryptionType").as[EncryptionType]
enhancedMonitoring <- x
.downField("enhancedMonitoring")
.as[Option[Vector[ShardLevelMetrics]]]
.as[Vector[ShardLevelMetrics]]
keyId <- x.downField("keyId").as[Option[String]]
retentionPeriod <- x.downField("retentionPeriod").as[FiniteDuration]
shards <- x
Expand Down Expand Up @@ -144,7 +144,7 @@ object StreamData {
StreamData(
SortedMap.empty,
EncryptionType.NONE,
None,
Vector(ShardLevelMetrics(Vector.empty)),
None,
minRetentionPeriod,
shards,
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/kinesis/mock/models/StreamDescription.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kinesis.mock.instances.circe._

final case class StreamDescription(
encryptionType: Option[EncryptionType],
enhancedMonitoring: Option[Vector[ShardLevelMetrics]],
enhancedMonitoring: Vector[ShardLevelMetrics],
hasMoreShards: Boolean,
keyId: Option[String],
retentionPeriodHours: Int,
Expand Down Expand Up @@ -100,7 +100,7 @@ object StreamDescription {
.as[Option[EncryptionType]]
enhancedMonitoring <- x
.downField("EnhancedMonitoring")
.as[Option[Vector[ShardLevelMetrics]]]
.as[Vector[ShardLevelMetrics]]
hasMoreShards <- x.downField("HasMoreShards").as[Boolean]
keyId <- x.downField("KeyId").as[Option[String]]
retentionPeriodHours <- x.downField("RetentionPeriodHours").as[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kinesis.mock.instances.circe._
final case class StreamDescriptionSummary(
consumerCount: Option[Int],
encryptionType: Option[EncryptionType],
enhancedMonitoring: Option[Vector[ShardLevelMetrics]],
enhancedMonitoring: Vector[ShardLevelMetrics],
keyId: Option[String],
openShardCount: Int,
retentionPeriodHours: Int,
Expand Down Expand Up @@ -76,7 +76,7 @@ object StreamDescriptionSummary {
encryptionType <- x.downField("EncryptionType").as[Option[EncryptionType]]
enhancedMonitoring <- x
.downField("EnhancedMonitoring")
.as[Option[Vector[ShardLevelMetrics]]]
.as[Vector[ShardLevelMetrics]]
keyId <- x.downField("KeyId").as[Option[String]]
openShardCount <- x.downField("OpenShardCount").as[Int]
retentionPeriodHours <- x.downField("RetentionPeriodHours").as[Int]
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/kinesis/mock/models/Streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ final case class Streams(streams: SortedMap[StreamArn, StreamData]) {
shards = SortedMap.empty,
streamStatus = StreamStatus.DELETING,
tags = Tags.empty,
enhancedMonitoring = None,
enhancedMonitoring = Vector.empty,
consumers = SortedMap.empty
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@ class DisableEnhancedMonitoringTests

val updated = streams.findAndUpdateStream(streamArn)(stream =>
stream.copy(enhancedMonitoring =
Some(
Vector(
ShardLevelMetrics(
Vector(
ShardLevelMetric.IncomingBytes,
ShardLevelMetric.IncomingRecords,
ShardLevelMetric.OutgoingBytes,
ShardLevelMetric.OutgoingRecords,
ShardLevelMetric.WriteProvisionedThroughputExceeded,
ShardLevelMetric.ReadProvisionedThroughputExceeded,
ShardLevelMetric.IteratorAgeMilliseconds
)
Vector(
ShardLevelMetrics(
Vector(
ShardLevelMetric.IncomingBytes,
ShardLevelMetric.IncomingRecords,
ShardLevelMetric.OutgoingBytes,
ShardLevelMetric.OutgoingRecords,
ShardLevelMetric.WriteProvisionedThroughputExceeded,
ShardLevelMetric.ReadProvisionedThroughputExceeded,
ShardLevelMetric.IteratorAgeMilliseconds
)
)
)
Expand All @@ -54,7 +52,7 @@ class DisableEnhancedMonitoringTests
s <- streamsRef.get
updatedMetrics = s.streams
.get(streamArn)
.map(_.enhancedMonitoring.map(_.flatMap(_.shardLevelMetrics)))
.map(_.enhancedMonitoring.flatMap(_.shardLevelMetrics))

} yield assert(
res.isRight && res.exists { case response =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class EnableEnhancedMonitoringTests
s <- streamsRef.get
updatedMetrics = s.streams
.get(streamArn)
.map(_.enhancedMonitoring.map(_.flatMap(_.shardLevelMetrics)))
.map(_.enhancedMonitoring.flatMap(_.shardLevelMetrics))

} yield assert(
res.isRight && res.exists { case response =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DescribeStreamSummaryTests
expected = StreamDescriptionSummary(
Some(0),
Some(EncryptionType.NONE),
None,
Vector(ShardLevelMetrics(Vector.empty)),
None,
1,
24,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DescribeStreamTests
.map(x => x.shards)
expected = StreamDescription(
Some(EncryptionType.NONE),
None,
Vector(ShardLevelMetrics(Vector.empty)),
false,
None,
24,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ class DisableEnhancedMonitoringTests
.rethrow
.map(
_.streamDescriptionSummary.enhancedMonitoring
.map(_.flatMap(_.shardLevelMetrics))
.flatMap(_.shardLevelMetrics)
)
} yield assert(
res.desiredShardLevelMetrics == streamMonitoring && !res.desiredShardLevelMetrics
.exists(_.contains(ShardLevelMetric.IncomingBytes))
.contains(ShardLevelMetric.IncomingBytes)
)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class EnableEnhancedMonitoringTests
.rethrow
.map(
_.streamDescriptionSummary.enhancedMonitoring
.map(_.flatMap(_.shardLevelMetrics))
.flatMap(_.shardLevelMetrics)
)
} yield assert(
res.desiredShardLevelMetrics == streamMonitoring && res.desiredShardLevelMetrics
.exists(_.contains(ShardLevelMetric.IncomingBytes))
.contains(ShardLevelMetric.IncomingBytes)
)
})
}
Loading

0 comments on commit d637349

Please sign in to comment.