Skip to content

Commit

Permalink
Reinitialise JSON converter for each index operation.
Browse files Browse the repository at this point in the history
This is kind of grasping at straws to fix the mysterious bug where
the index is populated with dodgy data seemingly from two items, but
it seems likely to be a reentrancy issue somewhere. This should prevent
any state in the JsonConverter lingering betwen ops.
  • Loading branch information
mikesname committed Jan 26, 2024
1 parent 08377c9 commit c3a180e
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ case class AkkaStreamsIndexMediatorHandle(
private val solrServiceConfig = ServiceConfig("solr", config)
private val solrBaseUrl: Uri = solrServiceConfig.baseUrl + "/update"
private val jsonSupport = EntityStreamingSupport.json(Integer.MAX_VALUE)
private val jsonConverter = new JsonConverter
private val mapper = new ObjectMapper
private val writer = mapper.writer().withDefaultPrettyPrinter()

Expand Down Expand Up @@ -97,7 +96,7 @@ case class AkkaStreamsIndexMediatorHandle(
.map(node => ByteString.fromArray(writer.writeValueAsBytes(node)))
.named("json-node-to-bytes")

private val jsonNodeToDoc: Flow[JsonNode, JsonNode, akka.NotUsed] = Flow[JsonNode]
private def jsonNodeToDoc(jsonConverter: JsonConverter): Flow[JsonNode, JsonNode, akka.NotUsed] = Flow[JsonNode]
.mapConcat[JsonNode](n => jsonConverter.convert(n).asScala.toVector)
.named("json-node-to-solr-doc")

Expand Down Expand Up @@ -138,6 +137,7 @@ case class AkkaStreamsIndexMediatorHandle(

val init = java.time.Instant.now()
var count = 0
val jsonConverter = new JsonConverter

val bytesFlow = Source(setCommonHeaders(requests))
.map { case (r, uri) =>
Expand All @@ -156,7 +156,7 @@ case class AkkaStreamsIndexMediatorHandle(
Source.future(s).flatMapConcat(n => n)
}
.flatMapConcat(n => n) // flatten the streams of JsonNodes into one stream
.via(jsonNodeToDoc)
.via(jsonNodeToDoc(jsonConverter))
.map { node =>
logger.trace(writer.writeValueAsString(node))
count = count + 1
Expand Down

0 comments on commit c3a180e

Please sign in to comment.