From 0983802a333e55b6b58d725465ebc8a2afa1d60d Mon Sep 17 00:00:00 2001 From: debasishchakraborty-egovt Date: Mon, 19 Aug 2024 11:52:45 +0530 Subject: [PATCH] Added changes for Demand Generatio Kafka Lag issue. Used kafka template --- .../consumer/DemandGenerationConsumer.java | 81 +++++++++++++++---- .../src/main/resources/application.properties | 6 ++ 2 files changed, 71 insertions(+), 16 deletions(-) diff --git a/municipal-services/ws-calculator/src/main/java/org/egov/wscalculation/consumer/DemandGenerationConsumer.java b/municipal-services/ws-calculator/src/main/java/org/egov/wscalculation/consumer/DemandGenerationConsumer.java index 38b916400..ef713e6c1 100644 --- a/municipal-services/ws-calculator/src/main/java/org/egov/wscalculation/consumer/DemandGenerationConsumer.java +++ b/municipal-services/ws-calculator/src/main/java/org/egov/wscalculation/consumer/DemandGenerationConsumer.java @@ -37,6 +37,7 @@ import org.egov.wscalculation.web.models.users.UserDetailResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -71,6 +72,9 @@ public class DemandGenerationConsumer { @Autowired private NotificationUtil util; + @Autowired + private KafkaTemplate kafkaTemplate; + @Autowired private CalculatorUtil calculatorUtils; @@ -192,7 +196,7 @@ private void generateDemandInBatch(CalculationReq request, Map m wsCalulationWorkflowValidator.applicationValidation(request.getRequestInfo(), criteria.getTenantId(), criteria.getConnectionNo(), genratedemand); }*/ - System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo()); + //System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo()); wSCalculationServiceImpl.bulkDemandGeneration(request, masterMap); /*String connectionNoStrings = request.getCalculationCriteria().stream() .map(criteria -> criteria.getConnectionNo()).collect(Collectors.toSet()).toString(); @@ -268,13 +272,13 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t Long dayEndTime = LocalDateTime .of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - + Long StartTimeForGetConnetion = System.currentTimeMillis(); List connectionNos = waterCalculatorDao.getNonMeterConnectionsList(tenantId, dayStartTime, dayEndTime); - Calendar previousFromDate = Calendar.getInstance(); + /*Calendar previousFromDate = Calendar.getInstance(); Calendar previousToDate = Calendar.getInstance(); previousFromDate.setTimeInMillis(dayStartTime); @@ -283,18 +287,19 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month previousToDate.add(Calendar.MONTH, -1); int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH); - previousToDate.set(Calendar.DAY_OF_MONTH, max); + previousToDate.set(Calendar.DAY_OF_MONTH, max);*/ String assessmentYear = estimationService.getAssessmentYear(); ArrayList failedConnectionNos = new ArrayList(); + + Long startTimeForMdms= System. + currentTimeMillis(); Map masterMap = mDataService.loadMasterData(requestInfo, tenantId); log.info("connectionNos" + connectionNos.size()); - log.info("connectionNos" + connectionNos); - log.info("dayStartTime:"+dayStartTime); - log.info("dayEndTime"+dayEndTime); - + long startTimeForLoop= System.currentTimeMillis(); for (String connectionNo : connectionNos) { + long timeBeforePushToKafka = System.currentTimeMillis(); CalculationCriteria calculationCriteria = CalculationCriteria.builder().tenantId(tenantId) .assessmentYear(assessmentYear).connectionNo(connectionNo).from(dayStartTime).to(dayEndTime).build(); List calculationCriteriaList = new ArrayList<>(); @@ -302,7 +307,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t CalculationReq calculationReq = CalculationReq.builder().calculationCriteria(calculationCriteriaList) .requestInfo(requestInfo).isconnectionCalculation(true).isAdvanceCalculation(false).build(); - Set consumerCodes = new LinkedHashSet(); + /*Set consumerCodes = new LinkedHashSet(); consumerCodes.add(connectionNo); if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(), @@ -312,7 +317,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t log.warn("this connection doen't have the demand in previous billing cycle :" + connectionNo); failedConnectionNos.add(connectionNo); continue; - } + }*/ HashMap genarateDemandData = new HashMap(); genarateDemandData.put("calculationReq", calculationReq); genarateDemandData.put("billingCycle",billingCycle); @@ -327,10 +332,14 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t * log.warn("this connection doen't have the demand in previous billing cycle :" * + connectionNo ); continue; } */ - log.info("sending generate demand for connection no :"+connectionNo); - producer.push(config.getWsGenerateDemandBulktopic(),genarateDemandData); + + long timetakenToPush= System.currentTimeMillis(); + kafkaTemplate.send(config.getWsGenerateDemandBulktopic(),genarateDemandData); } + log.info("Time taken for the for loop : "+(System.currentTimeMillis()-startTimeForLoop)/1000+ " Secondss"); + + Long starttimeforNotification= System.currentTimeMillis(); HashMap demandMessage = util.getLocalizationMessage(requestInfo, WSCalculationConstant.mGram_Consumer_NewDemand, tenantId); HashMap gpwscMap = util.getLocalizationMessage(requestInfo, tenantId, tenantId); @@ -362,7 +371,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t producer.push(config.getSmsNotifTopic(), smsRequest); } } - + log.info("Time taken for notification : "+(System.currentTimeMillis()-starttimeforNotification)/1000+ " Secondss"); }); /* if (isSendMessage && failedConnectionNos.size() > 0) { List actionItems = new ArrayList<>(); @@ -577,10 +586,15 @@ public void generateBulkDemandForULB(HashMap messageData) { String billingPeriod = bulkDemand.getBillingPeriod(); if (StringUtils.isEmpty(billingPeriod)) throw new CustomException("BILLING_PERIOD_PARSING_ISSUE", "Billing Period can not be empty!!"); + log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic for tenantid:" + +bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" Start Time:"+System.currentTimeMillis() ); + Long starTime = System.currentTimeMillis(); log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic" ); generateDemandAndSendnotification(bulkDemand.getRequestInfo(), bulkDemand.getTenantId(), billingPeriod, billingMasterData, isSendMessage, isManual); - + long endTime=System.currentTimeMillis(); + long diff = endTime-starTime; + log.info("time takenn to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" seconds"); } @KafkaListener(topics = { "${egov.update.demand.add.penalty}" }) @@ -603,9 +617,44 @@ public void generateDemandInBulkListner(HashMap messageData) { billingCycle= (String) genarateDemandData.get("billingCycle"); isSendMessage= (boolean) genarateDemandData.get("isSendMessage"); tenantId=(String) genarateDemandData.get("tenantId"); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/MM/yyyy"); + - log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo()); - generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId); + LocalDate fromDate = LocalDate.parse(billingCycle.split("-")[0].trim(), formatter); + LocalDate toDate = LocalDate.parse(billingCycle.split("-")[1].trim(), formatter); + + Long dayStartTime = LocalDateTime + .of(fromDate.getYear(), fromDate.getMonth(), fromDate.getDayOfMonth(), 0, 0, 0) + .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + Long dayEndTime = LocalDateTime + .of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000) + .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + Calendar previousFromDate = Calendar.getInstance(); + Calendar previousToDate = Calendar.getInstance(); + + previousFromDate.setTimeInMillis(dayStartTime); + previousToDate.setTimeInMillis(dayEndTime); + + previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month + previousToDate.add(Calendar.MONTH, -1); + int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH); + previousToDate.set(Calendar.DAY_OF_MONTH, max); + //log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo()); + Set consumerCodes = new LinkedHashSet(); + consumerCodes.add(calculationReq.getCalculationCriteria().get(0).getConnectionNo()); + if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(), + previousToDate.getTimeInMillis(), consumerCodes) + && !waterCalculatorDao.isConnectionExists(tenantId, previousFromDate.getTimeInMillis(), + previousToDate.getTimeInMillis(), consumerCodes)) { + log.warn("this connection doen't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo()); + } else { + Long starttime = System.currentTimeMillis(); + generateDemandInBulk(calculationReq, billingCycle, masterMap, isSendMessage, tenantId); + log.info("GOt call inn ws-gennerate-demand-bulk topic end time:" + System.currentTimeMillis()); + Long endtime = System.currentTimeMillis(); + long diff = endtime - starttime; + log.info("Time taken to process request for :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo() + " is :" + diff / 1000 + " secs"); + } } } diff --git a/municipal-services/ws-calculator/src/main/resources/application.properties b/municipal-services/ws-calculator/src/main/resources/application.properties index 86620d15d..8847a731e 100644 --- a/municipal-services/ws-calculator/src/main/resources/application.properties +++ b/municipal-services/ws-calculator/src/main/resources/application.properties @@ -22,6 +22,12 @@ spring.kafka.consumer.group-id=egov-ws-calc-services spring.kafka.consumer.properties.spring.json.use.type.headers=false spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.producer.linger.ms=10 +spring.kafka.producer.batch.size=32768 +spring.kafka.producer.buffer.memory=33554432 +spring.kafka.consumer.session.timeout.ms=30000 +spring.kafka.consumer.heartbeat.interval.ms=10000 +spring.kafka.consumer.max.poll.interval.ms=600000 $KAFKA TOPIC DETAILS egov.watercalculatorservice.createdemand.topic=ws-generate-demand