Skip to content

Commit

Permalink
Issue #LR-766 feat: Fixed Promise error. (#1252)
Browse files Browse the repository at this point in the history
  • Loading branch information
AmiableAnil authored May 23, 2024
1 parent 6467848 commit 75246aa
Showing 1 changed file with 36 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ public class ElasticSearchRestHighImpl implements ElasticSearchService {
@Override
public Future<String> save(String index, String identifier, Map<String, Object> data, RequestContext context) {
long startTime = System.currentTimeMillis();
Promise<String> promise = Futures.promise();
logger.debug(context,
"ElasticSearchUtilRest:save: method started at ==" + startTime + " for Index " + index);
Promise<String> promise = (Promise<String>) (Promise<?>) Futures.promise();

logger.debug(context, "ElasticSearchUtilRest:save: method started at ==" + startTime + " for Index " + index);
if (StringUtils.isBlank(identifier) || StringUtils.isBlank(index)) {
logger.info(context,
"ElasticSearchRestHighImpl:save: "
logger.info(context, "ElasticSearchRestHighImpl:save: "
+ "Identifier or Index value is null or empty, identifier : " + identifier
+ ",index: " + index + ",not able to save data.");
promise.success(ERROR);
Expand All @@ -88,26 +87,22 @@ public Future<String> save(String index, String identifier, Map<String, Object>
new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
logger.info(context,
"ElasticSearchRestHighImpl:save: Success for index : " + index
logger.info(context, "ElasticSearchRestHighImpl:save: Success for index : " + index
+ ", identifier :" + identifier);

promise.success(indexResponse.getId());
logger.debug(context,
"ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
logger.debug(context, "ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
+ " for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}

@Override
public void onFailure(Exception e) {
promise.failure(e);
logger.error(context,
"ElasticSearchRestHighImpl:save: "
logger.error(context, "ElasticSearchRestHighImpl:save: "
+ "Error while saving " + index
+ " id : " + identifier, e);
logger.debug(context,
"ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
logger.debug(context, "ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
+ " for INdex " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}
Expand All @@ -130,10 +125,9 @@ public void onFailure(Exception e) {
@Override
public Future<Boolean> update(String index, String documentId, Map<String, Object> document, RequestContext context) {
long startTime = System.currentTimeMillis();
logger.debug(context,
"ElasticSearchRestHighImpl:update: method started at ==" + startTime
logger.debug(context, "ElasticSearchRestHighImpl:update: method started at ==" + startTime
+ " for Index " + index);
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
document.put("identifier", documentId);

if (!StringUtils.isBlank(index) && !StringUtils.isBlank(documentId)) {
Expand All @@ -146,12 +140,10 @@ public Future<Boolean> update(String index, String documentId, Map<String, Objec
@Override
public void onResponse(UpdateResponse updateResponse) {
promise.success(true);
logger.info(context,
"ElasticSearchRestHighImpl:update: Success with " + updateResponse.getResult()
logger.info(context, "ElasticSearchRestHighImpl:update: Success with " + updateResponse.getResult()
+ " response from elastic search for index" + index
+ ",documentId : " + documentId);
logger.debug(context,
"ElasticSearchRestHighImpl:update: method end =="
logger.debug(context, "ElasticSearchRestHighImpl:update: method end =="
+ " for INdex " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}
Expand Down Expand Up @@ -180,13 +172,11 @@ public void onFailure(Exception e) {
* @return Map<String,Object> or empty map
*/
@Override
public Future<Map<String, Object>> getDataByIdentifier(
String index, String identifier, RequestContext context) {
public Future<Map<String, Object>> getDataByIdentifier(String index, String identifier, RequestContext context) {
long startTime = System.currentTimeMillis();
Promise<Map<String, Object>> promise = Futures.promise();
Promise<Map<String, Object>> promise = (Promise<Map<String, Object>>) (Promise<?>) Futures.promise();
if (StringUtils.isNotEmpty(identifier) && StringUtils.isNotEmpty(index)) {
logger.debug(context,
"ElasticSearchRestHighImpl:getDataByIdentifier: method started at ==" + startTime
logger.debug(context, "ElasticSearchRestHighImpl:getDataByIdentifier: method started at ==" + startTime
+ " for Index " + index);

GetRequest getRequest = new GetRequest(index, _DOC, identifier);
Expand All @@ -199,9 +189,7 @@ public void onResponse(GetResponse getResponse) {
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
if (MapUtils.isNotEmpty(sourceAsMap)) {
promise.success(sourceAsMap);
logger.debug(context,
"ElasticSearchRestHighImpl:getDataByIdentifier: method end =="
+ " for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:getDataByIdentifier: method end == for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
} else {
promise.success(new HashMap<>());
Expand All @@ -220,8 +208,7 @@ public void onFailure(Exception e) {

ConnectionManager.getRestClient().getAsync(getRequest, RequestOptions.DEFAULT, listener);
} else {
logger.info(context,
"ElasticSearchRestHighImpl:getDataByIdentifier: "
logger.info(context, "ElasticSearchRestHighImpl:getDataByIdentifier: "
+ "provided index or identifier is null, index = " + index
+ ", identifier = " + identifier);
promise.failure(ProjectUtil.createClientException(ResponseCode.invalidRequestData));
Expand All @@ -241,7 +228,7 @@ public void onFailure(Exception e) {
public Future<Boolean> delete(String index, String identifier, RequestContext context) {
long startTime = System.currentTimeMillis();
logger.debug(context, "ElasticSearchRestHighImpl:delete: method started at ==" + startTime);
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
if (StringUtils.isNotEmpty(identifier) && StringUtils.isNotEmpty(index)) {
DeleteRequest delRequest = new DeleteRequest(index, _DOC, identifier);
ActionListener<DeleteResponse> listener =
Expand All @@ -267,15 +254,13 @@ public void onFailure(Exception e) {

ConnectionManager.getRestClient().deleteAsync(delRequest, RequestOptions.DEFAULT, listener);
} else {
logger.info(context,
"ElasticSearchRestHighImpl:delete: "
logger.info(context, "ElasticSearchRestHighImpl:delete: "
+ "provided index or identifier is null, index = " + index
+ ", identifier = " + identifier);
promise.failure(ProjectUtil.createClientException(ResponseCode.invalidRequestData));
}

logger.debug(context,
"ElasticSearchRestHighImpl:delete: method end =="
logger.debug(context, "ElasticSearchRestHighImpl:delete: method end =="
+ " ,Total time elapsed = " + calculateEndTime(startTime));
return promise.future();
}
Expand Down Expand Up @@ -380,12 +365,11 @@ public Future<Map<String, Object>> search(SearchDTO searchDTO, String index, Req
if (null != searchDTO.getFacets() && !searchDTO.getFacets().isEmpty()) {
searchSourceBuilder = addAggregations(searchSourceBuilder, searchDTO.getFacets());
}
logger.info(context,
"ElasticSearchRestHighImpl:search: calling search for index " + index
logger.info(context, "ElasticSearchRestHighImpl:search: calling search for index " + index
+ ", with query = " + searchSourceBuilder.toString());

searchRequest.source(searchSourceBuilder);
Promise<Map<String, Object>> promise = Futures.promise();
Promise<Map<String, Object>> promise = (Promise<Map<String, Object>>) (Promise<?>) Futures.promise();

ActionListener<SearchResponse> listener =
new ActionListener<SearchResponse>() {
Expand All @@ -401,8 +385,7 @@ public void onResponse(SearchResponse response) {
promise.success(responseMap);
} else {
Map<String, Object> responseMap = ElasticSearchHelper.getSearchResponseMap(response, searchDTO, finalFacetList);
logger.debug(context,
"ElasticSearchRestHighImpl:search: method end "
logger.debug(context, "ElasticSearchRestHighImpl:search: method end "
+ " ,Total time elapsed = " + calculateEndTime(startTime));
promise.success(responseMap);
}
Expand All @@ -412,8 +395,7 @@ public void onResponse(SearchResponse response) {
public void onFailure(Exception e) {
promise.failure(e);

logger.debug(context,
"ElasticSearchRestHighImpl:search: method end for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:search: method end for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
logger.error(context, "ElasticSearchRestHighImpl:search: method Failed with error :", e);
}
Expand All @@ -431,7 +413,7 @@ public void onFailure(Exception e) {
@Override
public Future<Boolean> healthCheck() {
GetIndexRequest indexRequest = new GetIndexRequest().indices(ProjectUtil.EsType.user.getTypeName());
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
ActionListener<Boolean> listener =
new ActionListener<Boolean>() {
@Override
Expand Down Expand Up @@ -465,11 +447,10 @@ public void onFailure(Exception e) {
@Override
public Future<Boolean> bulkInsert(String index, List<Map<String, Object>> dataList, RequestContext context) {
long startTime = System.currentTimeMillis();
logger.debug(context,
"ElasticSearchRestHighImpl:bulkInsert: method started at ==" + startTime
logger.debug(context, "ElasticSearchRestHighImpl:bulkInsert: method started at ==" + startTime
+ " for Index " + index);
BulkRequest request = new BulkRequest();
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
for (Map<String, Object> data : dataList) {
data.put("identifier", data.get(JsonKey.ID));
request.add(new IndexRequest(index, _DOC, (String) data.get(JsonKey.ID)).source(data));
Expand All @@ -486,8 +467,7 @@ public void onResponse(BulkResponse bulkResponse) {
BulkItemResponse bResponse = responseItr.next();

if (bResponse.isFailed()) {
logger.info(context,
"ElasticSearchRestHighImpl:bulkinsert: api response===" + bResponse.getId()
logger.info(context, "ElasticSearchRestHighImpl:bulkinsert: api response===" + bResponse.getId()
+ " " + bResponse.getFailureMessage());
}
}
Expand All @@ -502,9 +482,7 @@ public void onFailure(Exception e) {
};
ConnectionManager.getRestClient().bulkAsync(request, RequestOptions.DEFAULT, listener);

logger.debug(context,
"ElasticSearchRestHighImpl:bulkInsert: method end =="
+ " for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:bulkInsert: method end == for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
return promise.future();
}
Expand All @@ -531,8 +509,7 @@ private static SearchSourceBuilder addAggregations(SearchSourceBuilder searchSou
AggregationBuilders.terms(key).field(key + ElasticSearchHelper.RAW_APPEND));
}
}
logger.debug(null,
"ElasticSearchRestHighImpl:addAggregations: method end =="
logger.debug(null, "ElasticSearchRestHighImpl:addAggregations: method end =="
+ " ,Total time elapsed = " + calculateEndTime(startTime));
return searchSourceBuilder;
}
Expand All @@ -550,9 +527,8 @@ private static SearchSourceBuilder addAggregations(SearchSourceBuilder searchSou
@Override
public Future<Boolean> upsert(String index, String identifier, Map<String, Object> data, RequestContext context) {
long startTime = System.currentTimeMillis();
Promise<Boolean> promise = Futures.promise();
logger.debug(context,
"ElasticSearchRestHighImpl:upsert: method started at ==" + startTime
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
logger.debug(context, "ElasticSearchRestHighImpl:upsert: method started at ==" + startTime
+ " for INdex " + index);
if (!StringUtils.isBlank(index)
&& !StringUtils.isBlank(identifier)
Expand All @@ -568,13 +544,10 @@ public Future<Boolean> upsert(String index, String identifier, Map<String, Objec
@Override
public void onResponse(UpdateResponse updateResponse) {
promise.success(true);
logger.info(context,
"ElasticSearchRestHighImpl:upsert: Response for index : " + updateResponse.getResult()
logger.info(context, "ElasticSearchRestHighImpl:upsert: Response for index : " + updateResponse.getResult()
+ "," + index
+ ",identifier : " + identifier);
logger.debug(context,
"ElasticSearchRestHighImpl:upsert: method end =="
+ " for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:upsert: method end == for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}

Expand Down Expand Up @@ -614,9 +587,8 @@ public Future<Map<String, Map<String, Object>>> getEsResultByListOfIds(List<Stri
Future<Map<String, Object>> resultF = search(searchDTO, index, null);
Map<String, Object> result = (Map<String, Object>) ElasticSearchHelper.getResponseFromFuture(resultF);
List<Map<String, Object>> esContent = (List<Map<String, Object>>) result.get(JsonKey.CONTENT);
Promise<Map<String, Map<String, Object>>> promise = Futures.promise();
promise.success(
esContent.stream().collect(Collectors.toMap(
Promise<Map<String, Map<String, Object>>> promise = (Promise<Map<String, Map<String, Object>>>) (Promise<?>) Futures.promise();
promise.success(esContent.stream().collect(Collectors.toMap(
obj -> {
return (String) obj.get("id");
},
Expand Down

0 comments on commit 75246aa

Please sign in to comment.