From c3a180e2656371e7693ce138f04a1c67e19b8ea8 Mon Sep 17 00:00:00 2001 From: Mike Bryant Date: Fri, 26 Jan 2024 10:24:09 +0000 Subject: [PATCH] Reinitialise JSON converter for each index operation. This is kind of grasping at straws to fix the mysterious bug where the index is populated with dodgy data seemingly from two items, but it seems likely to be a reentrancy issue somewhere. This should prevent any state in the JsonConverter lingering betwen ops. --- .../app/services/search/AkkaStreamsIndexMediator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/portal/app/services/search/AkkaStreamsIndexMediator.scala b/modules/portal/app/services/search/AkkaStreamsIndexMediator.scala index c4f9ed68f..b3ebaa6d2 100644 --- a/modules/portal/app/services/search/AkkaStreamsIndexMediator.scala +++ b/modules/portal/app/services/search/AkkaStreamsIndexMediator.scala @@ -58,7 +58,6 @@ case class AkkaStreamsIndexMediatorHandle( private val solrServiceConfig = ServiceConfig("solr", config) private val solrBaseUrl: Uri = solrServiceConfig.baseUrl + "/update" private val jsonSupport = EntityStreamingSupport.json(Integer.MAX_VALUE) - private val jsonConverter = new JsonConverter private val mapper = new ObjectMapper private val writer = mapper.writer().withDefaultPrettyPrinter() @@ -97,7 +96,7 @@ case class AkkaStreamsIndexMediatorHandle( .map(node => ByteString.fromArray(writer.writeValueAsBytes(node))) .named("json-node-to-bytes") - private val jsonNodeToDoc: Flow[JsonNode, JsonNode, akka.NotUsed] = Flow[JsonNode] + private def jsonNodeToDoc(jsonConverter: JsonConverter): Flow[JsonNode, JsonNode, akka.NotUsed] = Flow[JsonNode] .mapConcat[JsonNode](n => jsonConverter.convert(n).asScala.toVector) .named("json-node-to-solr-doc") @@ -138,6 +137,7 @@ case class AkkaStreamsIndexMediatorHandle( val init = java.time.Instant.now() var count = 0 + val jsonConverter = new JsonConverter val bytesFlow = Source(setCommonHeaders(requests)) .map { case (r, uri) => @@ -156,7 +156,7 @@ case class AkkaStreamsIndexMediatorHandle( Source.future(s).flatMapConcat(n => n) } .flatMapConcat(n => n) // flatten the streams of JsonNodes into one stream - .via(jsonNodeToDoc) + .via(jsonNodeToDoc(jsonConverter)) .map { node => logger.trace(writer.writeValueAsString(node)) count = count + 1