Skip to content

Commit

Permalink
Use ShardSummary instead of Shard in ListShardsResponse, CalculateSha…
Browse files Browse the repository at this point in the history
…rd fixes (#32)

* Use ShardSummary instead of Shard in ListShardsResponse

* fix tests

* Fixes to calculateShard
  • Loading branch information
etspaceman authored Apr 15, 2021
1 parent 55f7150 commit 80df870
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 53 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/kinesis/mock/api/ListShardsRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ final case class ListShardsRequest(
ListShardsRequest
.createNextToken(streamName, shards.last.shardId.shardId)
)
ListShardsResponse(nextToken, shards)
ListShardsResponse(nextToken, shards.map(ShardSummary.fromShard))
})
}
case (_, None, _, _, Some(sName)) =>
Expand Down Expand Up @@ -150,7 +150,7 @@ final case class ListShardsRequest(
ListShardsRequest
.createNextToken(sName, shards.last.shardId.shardId)
)
ListShardsResponse(nextToken, shards)
ListShardsResponse(nextToken, shards.map(ShardSummary.fromShard))
})
)
case (_, None, _, _, None) =>
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/kinesis/mock/api/ListShardsResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import kinesis.mock.models._

final case class ListShardsResponse(
nextToken: Option[String],
shards: List[Shard]
shards: List[ShardSummary]
)

object ListShardsResponse {
Expand All @@ -18,7 +18,7 @@ object ListShardsResponse {
x =>
for {
nextToken <- x.downField("NextToken").as[Option[String]]
shards <- x.downField("Shards").as[List[Shard]]
shards <- x.downField("Shards").as[List[ShardSummary]]
} yield ListShardsResponse(nextToken, shards)
implicit val listShardsResponseEq: Eq[ListShardsResponse] =
(x, y) => x.nextToken == y.nextToken && x.shards === y.shards
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/kinesis/mock/models/ShardSummary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ final case class ShardSummary(
parentShardId: Option[String],
sequenceNumberRange: SequenceNumberRange,
shardId: String
)
) {
val isOpen: Boolean = sequenceNumberRange.endingSequenceNumber.isEmpty
}

object ShardSummary {
def fromShard(shard: Shard): ShardSummary = ShardSummary(
Expand Down
60 changes: 32 additions & 28 deletions src/main/scala/kinesis/mock/validations/CommonValidations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,36 +316,40 @@ object CommonValidations {
explicitHashKey: Option[String],
stream: StreamData
): ValidatedNel[KinesisMockException, (Shard, List[KinesisRecord])] = {
Try(
Md5Utils.computeMD5Hash(
explicitHashKey
.getOrElse(partitionKey)
.getBytes(StandardCharsets.US_ASCII)
)
).toValidated
.leftMap(e =>
NonEmptyList.one(
InvalidArgumentException(
s"Could not compute MD5 hash, ${e.getMessage}"
)
)
)
.andThen { hashBytes =>
val hashInt = BigInt.apply(1, hashBytes)

stream.shards
.collectFirst {
case (shard, data)
if hashInt >= shard.hashKeyRange.startingHashKey && hashInt <= shard.hashKeyRange.endingHashKey =>
(shard, data)
} match {
case None =>
InvalidArgumentException(
"Could not find shard for partitionKey"
).invalidNel
case Some(x) => Valid(x)
(explicitHashKey match {
case Some(ehk) =>
val hash = BigInt(ehk)
if (hash < Shard.minHashKey || hash > Shard.maxHashKey) {
InvalidArgumentException("ExplicitHashKey is not valid").invalidNel
} else {
hash.validNel
}
case None =>
Try(
Md5Utils.computeMD5Hash(partitionKey.getBytes(StandardCharsets.UTF_8))
).toValidated.bimap(
e =>
NonEmptyList.one(
InvalidArgumentException(
s"Could not compute MD5 hash, ${e.getMessage}"
)
),
x => BigInt(1, x)
)
}).andThen { hashInt =>
stream.shards
.collectFirst {
case (shard, data)
if shard.isOpen && hashInt >= shard.hashKeyRange.startingHashKey && hashInt <= shard.hashKeyRange.endingHashKey =>
(shard, data)
} match {
case None =>
InvalidArgumentException(
"Could not find shard for partitionKey"
).invalidNel
case Some(x) => Valid(x)
}
}
}

def validateExplicitHashKey(
Expand Down
36 changes: 26 additions & 10 deletions src/test/scala/kinesis/mock/api/ListShardsTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
streams.streams.get(streamName).exists { s =>
s.shards.keys.toList == response.shards
s.shards.keys.toList.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\nres: $res"
})
Expand All @@ -50,11 +50,15 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && paginatedRes.isValid && res.exists { response =>
streams.streams.get(streamName).exists { s =>
s.shards.keys.toList.take(50) == response.shards
s.shards.keys.toList
.take(50)
.map(ShardSummary.fromShard) == response.shards
}
} && paginatedRes.exists { response =>
streams.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(50) == response.shards
s.shards.keys.toList
.takeRight(50)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"resCount: ${res.map(_.shards.length)}\n" +
Expand Down Expand Up @@ -86,7 +90,9 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
streams.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(89) == response.shards
s.shards.keys.toList
.takeRight(89)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\nres: $res"
})
Expand Down Expand Up @@ -130,7 +136,9 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
updated.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(95) == response.shards
s.shards.keys.toList
.takeRight(95)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"res: ${res.map(_.shards.length)}\n" +
Expand Down Expand Up @@ -176,7 +184,7 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
updated.streams.get(streamName).exists { s =>
s.shards.keys.toList == response.shards
s.shards.keys.toList.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"res: ${res.map(_.shards.length)}\n" +
Expand Down Expand Up @@ -225,7 +233,9 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
updated.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(95) == response.shards
s.shards.keys.toList
.takeRight(95)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"res: ${res.map(_.shards.length)}\n" +
Expand Down Expand Up @@ -258,7 +268,9 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
streams.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(95) == response.shards
s.shards.keys.toList
.takeRight(95)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"resLen: ${res.map(_.shards.length)}\n" +
Expand Down Expand Up @@ -327,7 +339,9 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
updated.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(95) == response.shards
s.shards.keys.toList
.takeRight(95)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"res: ${res.map(_.shards.length)}\n" +
Expand Down Expand Up @@ -395,7 +409,9 @@ class ListShardsTests extends munit.ScalaCheckSuite {

(res.isValid && res.exists { response =>
updated.streams.get(streamName).exists { s =>
s.shards.keys.toList.takeRight(95) == response.shards
s.shards.keys.toList
.takeRight(95)
.map(ShardSummary.fromShard) == response.shards
}
}) :| s"req: $req\n" +
s"res: ${res.map(_.shards.length)}\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DescribeStreamTests
context
)
.rethrow
.map(x => x.shards.map(ShardSummary.fromShard))
.map(x => x.shards)
expected = StreamDescription(
Some(EncryptionType.NONE),
List(ShardLevelMetrics(List.empty)),
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/kinesis/mock/cache/GetRecordsTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class GetRecordsTests
shardIterator <- cache
.getShardIterator(
GetShardIteratorRequest(
shard.shardId.shardId,
shard.shardId,
ShardIteratorType.TRIM_HORIZON,
None,
streamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GetShardIteratorTests
res <- cache
.getShardIterator(
GetShardIteratorRequest(
shard.shardId.shardId,
shard.shardId,
ShardIteratorType.TRIM_HORIZON,
None,
streamName,
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/kinesis/mock/cache/PutRecordTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PutRecordTests
shardIterator <- cache
.getShardIterator(
GetShardIteratorRequest(
shard.shardId.shardId,
shard.shardId,
ShardIteratorType.TRIM_HORIZON,
None,
streamName,
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/kinesis/mock/cache/PutRecordsTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PutRecordsTests
shardIterator <- cache
.getShardIterator(
GetShardIteratorRequest(
shard.shardId.shardId,
shard.shardId,
ShardIteratorType.TRIM_HORIZON,
None,
streamName,
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/kinesis/mock/cache/SplitShardTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SplitShardTests
.splitShard(
SplitShardRequest(
(shardToSplit.hashKeyRange.endingHashKey / BigInt(2)).toString,
shardToSplit.shardId.shardId,
shardToSplit.shardId,
streamName
),
context
Expand All @@ -68,7 +68,7 @@ class SplitShardTests
checkStream2.streamDescriptionSummary.streamStatus == StreamStatus.ACTIVE &&
checkShards.shards.count(!_.isOpen) == 1 &&
checkShards.shards.count(shard =>
shard.parentShardId.contains(shardToSplit.shardId.shardId)
shard.parentShardId.contains(shardToSplit.shardId)
) == 2 && checkShards.shards.length == 7,
s"${checkShards.shards.mkString("\n\t")}\n" +
s"$checkStream1\n" +
Expand Down
10 changes: 7 additions & 3 deletions src/test/scala/kinesis/mock/instances/arbitrary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ object arbitrary {
shard
)

def shardSummaryGen(shardIndex: Int): Gen[ShardSummary] =
shardGen(shardIndex).map(ShardSummary.fromShard)

implicit val shardArbitrary: Arbitrary[Shard] = Arbitrary(
Gen.choose(100, 1000).flatMap(index => shardGen(index))
)
Expand Down Expand Up @@ -610,8 +613,8 @@ object arbitrary {
implicit val listShardsResponseArb: Arbitrary[ListShardsResponse] = Arbitrary(
for {
nextToken <- Gen.option(nextTokenGen(None))
shards <- Gen.sequence[List[Shard], Shard](
List.range(0, 100).map(x => shardGen(x))
shards <- Gen.sequence[List[ShardSummary], ShardSummary](
List.range(0, 100).map(x => shardSummaryGen(x))
)
} yield ListShardsResponse(nextToken, shards)
)
Expand Down Expand Up @@ -687,7 +690,8 @@ object arbitrary {
)
)

val explicitHashKeyGen: Gen[String] = RegexpGen.from("0|([1-9]\\d{0,38})")
val explicitHashKeyGen: Gen[String] =
Gen.choose(Shard.minHashKey, Shard.maxHashKey).map(_.toString)
val partitionKeyGen: Gen[String] =
Gen.choose(1, 256).flatMap(size => Gen.stringOfN(size, Gen.alphaNumChar))

Expand Down

0 comments on commit 80df870

Please sign in to comment.