Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #LR-766 feat: Fixed Promise error. #1252

Merged
merged 2 commits into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ public class ElasticSearchRestHighImpl implements ElasticSearchService {
@Override
public Future<String> save(String index, String identifier, Map<String, Object> data, RequestContext context) {
long startTime = System.currentTimeMillis();
Promise<String> promise = Futures.promise();
logger.debug(context,
"ElasticSearchUtilRest:save: method started at ==" + startTime + " for Index " + index);
Promise<String> promise = (Promise<String>) (Promise<?>) Futures.promise();

logger.debug(context, "ElasticSearchUtilRest:save: method started at ==" + startTime + " for Index " + index);
if (StringUtils.isBlank(identifier) || StringUtils.isBlank(index)) {
logger.info(context,
"ElasticSearchRestHighImpl:save: "
logger.info(context, "ElasticSearchRestHighImpl:save: "
+ "Identifier or Index value is null or empty, identifier : " + identifier
+ ",index: " + index + ",not able to save data.");
promise.success(ERROR);
Expand All @@ -88,26 +87,22 @@ public Future<String> save(String index, String identifier, Map<String, Object>
new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
logger.info(context,
"ElasticSearchRestHighImpl:save: Success for index : " + index
logger.info(context, "ElasticSearchRestHighImpl:save: Success for index : " + index
+ ", identifier :" + identifier);

promise.success(indexResponse.getId());
logger.debug(context,
"ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
logger.debug(context, "ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
+ " for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}

@Override
public void onFailure(Exception e) {
promise.failure(e);
logger.error(context,
"ElasticSearchRestHighImpl:save: "
logger.error(context, "ElasticSearchRestHighImpl:save: "
+ "Error while saving " + index
+ " id : " + identifier, e);
logger.debug(context,
"ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
logger.debug(context, "ElasticSearchRestHighImpl:save: method end at ==" + System.currentTimeMillis()
+ " for INdex " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}
Expand All @@ -130,10 +125,9 @@ public void onFailure(Exception e) {
@Override
public Future<Boolean> update(String index, String documentId, Map<String, Object> document, RequestContext context) {
long startTime = System.currentTimeMillis();
logger.debug(context,
"ElasticSearchRestHighImpl:update: method started at ==" + startTime
logger.debug(context, "ElasticSearchRestHighImpl:update: method started at ==" + startTime
+ " for Index " + index);
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
document.put("identifier", documentId);

if (!StringUtils.isBlank(index) && !StringUtils.isBlank(documentId)) {
Expand All @@ -146,12 +140,10 @@ public Future<Boolean> update(String index, String documentId, Map<String, Objec
@Override
public void onResponse(UpdateResponse updateResponse) {
promise.success(true);
logger.info(context,
"ElasticSearchRestHighImpl:update: Success with " + updateResponse.getResult()
logger.info(context, "ElasticSearchRestHighImpl:update: Success with " + updateResponse.getResult()
+ " response from elastic search for index" + index
+ ",documentId : " + documentId);
logger.debug(context,
"ElasticSearchRestHighImpl:update: method end =="
logger.debug(context, "ElasticSearchRestHighImpl:update: method end =="
+ " for INdex " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}
Expand Down Expand Up @@ -180,13 +172,11 @@ public void onFailure(Exception e) {
* @return Map<String,Object> or empty map
*/
@Override
public Future<Map<String, Object>> getDataByIdentifier(
String index, String identifier, RequestContext context) {
public Future<Map<String, Object>> getDataByIdentifier(String index, String identifier, RequestContext context) {
long startTime = System.currentTimeMillis();
Promise<Map<String, Object>> promise = Futures.promise();
Promise<Map<String, Object>> promise = (Promise<Map<String, Object>>) (Promise<?>) Futures.promise();
if (StringUtils.isNotEmpty(identifier) && StringUtils.isNotEmpty(index)) {
logger.debug(context,
"ElasticSearchRestHighImpl:getDataByIdentifier: method started at ==" + startTime
logger.debug(context, "ElasticSearchRestHighImpl:getDataByIdentifier: method started at ==" + startTime
+ " for Index " + index);

GetRequest getRequest = new GetRequest(index, _DOC, identifier);
Expand All @@ -199,9 +189,7 @@ public void onResponse(GetResponse getResponse) {
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
if (MapUtils.isNotEmpty(sourceAsMap)) {
promise.success(sourceAsMap);
logger.debug(context,
"ElasticSearchRestHighImpl:getDataByIdentifier: method end =="
+ " for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:getDataByIdentifier: method end == for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
} else {
promise.success(new HashMap<>());
Expand All @@ -220,8 +208,7 @@ public void onFailure(Exception e) {

ConnectionManager.getRestClient().getAsync(getRequest, RequestOptions.DEFAULT, listener);
} else {
logger.info(context,
"ElasticSearchRestHighImpl:getDataByIdentifier: "
logger.info(context, "ElasticSearchRestHighImpl:getDataByIdentifier: "
+ "provided index or identifier is null, index = " + index
+ ", identifier = " + identifier);
promise.failure(ProjectUtil.createClientException(ResponseCode.invalidRequestData));
Expand All @@ -241,7 +228,7 @@ public void onFailure(Exception e) {
public Future<Boolean> delete(String index, String identifier, RequestContext context) {
long startTime = System.currentTimeMillis();
logger.debug(context, "ElasticSearchRestHighImpl:delete: method started at ==" + startTime);
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
if (StringUtils.isNotEmpty(identifier) && StringUtils.isNotEmpty(index)) {
DeleteRequest delRequest = new DeleteRequest(index, _DOC, identifier);
ActionListener<DeleteResponse> listener =
Expand All @@ -267,15 +254,13 @@ public void onFailure(Exception e) {

ConnectionManager.getRestClient().deleteAsync(delRequest, RequestOptions.DEFAULT, listener);
} else {
logger.info(context,
"ElasticSearchRestHighImpl:delete: "
logger.info(context, "ElasticSearchRestHighImpl:delete: "
+ "provided index or identifier is null, index = " + index
+ ", identifier = " + identifier);
promise.failure(ProjectUtil.createClientException(ResponseCode.invalidRequestData));
}

logger.debug(context,
"ElasticSearchRestHighImpl:delete: method end =="
logger.debug(context, "ElasticSearchRestHighImpl:delete: method end =="
+ " ,Total time elapsed = " + calculateEndTime(startTime));
return promise.future();
}
Expand Down Expand Up @@ -380,12 +365,11 @@ public Future<Map<String, Object>> search(SearchDTO searchDTO, String index, Req
if (null != searchDTO.getFacets() && !searchDTO.getFacets().isEmpty()) {
searchSourceBuilder = addAggregations(searchSourceBuilder, searchDTO.getFacets());
}
logger.info(context,
"ElasticSearchRestHighImpl:search: calling search for index " + index
logger.info(context, "ElasticSearchRestHighImpl:search: calling search for index " + index
+ ", with query = " + searchSourceBuilder.toString());

searchRequest.source(searchSourceBuilder);
Promise<Map<String, Object>> promise = Futures.promise();
Promise<Map<String, Object>> promise = (Promise<Map<String, Object>>) (Promise<?>) Futures.promise();

ActionListener<SearchResponse> listener =
new ActionListener<SearchResponse>() {
Expand All @@ -401,8 +385,7 @@ public void onResponse(SearchResponse response) {
promise.success(responseMap);
} else {
Map<String, Object> responseMap = ElasticSearchHelper.getSearchResponseMap(response, searchDTO, finalFacetList);
logger.debug(context,
"ElasticSearchRestHighImpl:search: method end "
logger.debug(context, "ElasticSearchRestHighImpl:search: method end "
+ " ,Total time elapsed = " + calculateEndTime(startTime));
promise.success(responseMap);
}
Expand All @@ -412,8 +395,7 @@ public void onResponse(SearchResponse response) {
public void onFailure(Exception e) {
promise.failure(e);

logger.debug(context,
"ElasticSearchRestHighImpl:search: method end for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:search: method end for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
logger.error(context, "ElasticSearchRestHighImpl:search: method Failed with error :", e);
}
Expand All @@ -431,7 +413,7 @@ public void onFailure(Exception e) {
@Override
public Future<Boolean> healthCheck() {
GetIndexRequest indexRequest = new GetIndexRequest().indices(ProjectUtil.EsType.user.getTypeName());
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
ActionListener<Boolean> listener =
new ActionListener<Boolean>() {
@Override
Expand Down Expand Up @@ -465,11 +447,10 @@ public void onFailure(Exception e) {
@Override
public Future<Boolean> bulkInsert(String index, List<Map<String, Object>> dataList, RequestContext context) {
long startTime = System.currentTimeMillis();
logger.debug(context,
"ElasticSearchRestHighImpl:bulkInsert: method started at ==" + startTime
logger.debug(context, "ElasticSearchRestHighImpl:bulkInsert: method started at ==" + startTime
+ " for Index " + index);
BulkRequest request = new BulkRequest();
Promise<Boolean> promise = Futures.promise();
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
for (Map<String, Object> data : dataList) {
data.put("identifier", data.get(JsonKey.ID));
request.add(new IndexRequest(index, _DOC, (String) data.get(JsonKey.ID)).source(data));
Expand All @@ -486,8 +467,7 @@ public void onResponse(BulkResponse bulkResponse) {
BulkItemResponse bResponse = responseItr.next();

if (bResponse.isFailed()) {
logger.info(context,
"ElasticSearchRestHighImpl:bulkinsert: api response===" + bResponse.getId()
logger.info(context, "ElasticSearchRestHighImpl:bulkinsert: api response===" + bResponse.getId()
+ " " + bResponse.getFailureMessage());
}
}
Expand All @@ -502,9 +482,7 @@ public void onFailure(Exception e) {
};
ConnectionManager.getRestClient().bulkAsync(request, RequestOptions.DEFAULT, listener);

logger.debug(context,
"ElasticSearchRestHighImpl:bulkInsert: method end =="
+ " for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:bulkInsert: method end == for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
return promise.future();
}
Expand All @@ -531,8 +509,7 @@ private static SearchSourceBuilder addAggregations(SearchSourceBuilder searchSou
AggregationBuilders.terms(key).field(key + ElasticSearchHelper.RAW_APPEND));
}
}
logger.debug(null,
"ElasticSearchRestHighImpl:addAggregations: method end =="
logger.debug(null, "ElasticSearchRestHighImpl:addAggregations: method end =="
+ " ,Total time elapsed = " + calculateEndTime(startTime));
return searchSourceBuilder;
}
Expand All @@ -550,9 +527,8 @@ private static SearchSourceBuilder addAggregations(SearchSourceBuilder searchSou
@Override
public Future<Boolean> upsert(String index, String identifier, Map<String, Object> data, RequestContext context) {
long startTime = System.currentTimeMillis();
Promise<Boolean> promise = Futures.promise();
logger.debug(context,
"ElasticSearchRestHighImpl:upsert: method started at ==" + startTime
Promise<Boolean> promise = (Promise<Boolean>) (Promise<?>) Futures.promise();
logger.debug(context, "ElasticSearchRestHighImpl:upsert: method started at ==" + startTime
+ " for INdex " + index);
if (!StringUtils.isBlank(index)
&& !StringUtils.isBlank(identifier)
Expand All @@ -568,13 +544,10 @@ public Future<Boolean> upsert(String index, String identifier, Map<String, Objec
@Override
public void onResponse(UpdateResponse updateResponse) {
promise.success(true);
logger.info(context,
"ElasticSearchRestHighImpl:upsert: Response for index : " + updateResponse.getResult()
logger.info(context, "ElasticSearchRestHighImpl:upsert: Response for index : " + updateResponse.getResult()
+ "," + index
+ ",identifier : " + identifier);
logger.debug(context,
"ElasticSearchRestHighImpl:upsert: method end =="
+ " for Index " + index
logger.debug(context, "ElasticSearchRestHighImpl:upsert: method end == for Index " + index
+ " ,Total time elapsed = " + calculateEndTime(startTime));
}

Expand Down Expand Up @@ -614,9 +587,8 @@ public Future<Map<String, Map<String, Object>>> getEsResultByListOfIds(List<Stri
Future<Map<String, Object>> resultF = search(searchDTO, index, null);
Map<String, Object> result = (Map<String, Object>) ElasticSearchHelper.getResponseFromFuture(resultF);
List<Map<String, Object>> esContent = (List<Map<String, Object>>) result.get(JsonKey.CONTENT);
Promise<Map<String, Map<String, Object>>> promise = Futures.promise();
promise.success(
esContent.stream().collect(Collectors.toMap(
Promise<Map<String, Map<String, Object>>> promise = (Promise<Map<String, Map<String, Object>>>) (Promise<?>) Futures.promise();
promise.success(esContent.stream().collect(Collectors.toMap(
obj -> {
return (String) obj.get("id");
},
Expand Down