Skip to content
This repository has been archived by the owner on Nov 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #917 from egovernments/KafkaIssue
Browse files Browse the repository at this point in the history
Added changes for Demand Generatio Kafka Lag issue. Used kafka template
  • Loading branch information
pradeepkumarcm-egov authored Aug 19, 2024
2 parents 0547b02 + 0983802 commit 0f81711
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.egov.wscalculation.web.models.users.UserDetailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class DemandGenerationConsumer {
@Autowired
private NotificationUtil util;

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private CalculatorUtil calculatorUtils;

Expand Down Expand Up @@ -192,7 +196,7 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
wsCalulationWorkflowValidator.applicationValidation(request.getRequestInfo(), criteria.getTenantId(),
criteria.getConnectionNo(), genratedemand);
}*/
System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
//System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
wSCalculationServiceImpl.bulkDemandGeneration(request, masterMap);
/*String connectionNoStrings = request.getCalculationCriteria().stream()
.map(criteria -> criteria.getConnectionNo()).collect(Collectors.toSet()).toString();
Expand Down Expand Up @@ -268,13 +272,13 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
Long dayEndTime = LocalDateTime
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();

Long StartTimeForGetConnetion = System.currentTimeMillis();

List<String> connectionNos = waterCalculatorDao.getNonMeterConnectionsList(tenantId, dayStartTime, dayEndTime);



Calendar previousFromDate = Calendar.getInstance();
/*Calendar previousFromDate = Calendar.getInstance();
Calendar previousToDate = Calendar.getInstance();
previousFromDate.setTimeInMillis(dayStartTime);
Expand All @@ -283,26 +287,27 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
previousToDate.set(Calendar.DAY_OF_MONTH, max);*/
String assessmentYear = estimationService.getAssessmentYear();
ArrayList<String> failedConnectionNos = new ArrayList<String>();

Long startTimeForMdms= System.
currentTimeMillis();
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

log.info("connectionNos" + connectionNos.size());
log.info("connectionNos" + connectionNos);
log.info("dayStartTime:"+dayStartTime);
log.info("dayEndTime"+dayEndTime);

long startTimeForLoop= System.currentTimeMillis();
for (String connectionNo : connectionNos) {
long timeBeforePushToKafka = System.currentTimeMillis();
CalculationCriteria calculationCriteria = CalculationCriteria.builder().tenantId(tenantId)
.assessmentYear(assessmentYear).connectionNo(connectionNo).from(dayStartTime).to(dayEndTime).build();
List<CalculationCriteria> calculationCriteriaList = new ArrayList<>();
calculationCriteriaList.add(calculationCriteria);
CalculationReq calculationReq = CalculationReq.builder().calculationCriteria(calculationCriteriaList)
.requestInfo(requestInfo).isconnectionCalculation(true).isAdvanceCalculation(false).build();

Set<String> consumerCodes = new LinkedHashSet<String>();
/*Set<String> consumerCodes = new LinkedHashSet<String>();
consumerCodes.add(connectionNo);
if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(),
Expand All @@ -312,7 +317,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
log.warn("this connection doen't have the demand in previous billing cycle :" + connectionNo);
failedConnectionNos.add(connectionNo);
continue;
}
}*/
HashMap<Object, Object> genarateDemandData = new HashMap<Object, Object>();
genarateDemandData.put("calculationReq", calculationReq);
genarateDemandData.put("billingCycle",billingCycle);
Expand All @@ -327,10 +332,14 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
* log.warn("this connection doen't have the demand in previous billing cycle :"
* + connectionNo ); continue; }
*/
log.info("sending generate demand for connection no :"+connectionNo);
producer.push(config.getWsGenerateDemandBulktopic(),genarateDemandData);

long timetakenToPush= System.currentTimeMillis();
kafkaTemplate.send(config.getWsGenerateDemandBulktopic(),genarateDemandData);

}
log.info("Time taken for the for loop : "+(System.currentTimeMillis()-startTimeForLoop)/1000+ " Secondss");

Long starttimeforNotification= System.currentTimeMillis();
HashMap<String, String> demandMessage = util.getLocalizationMessage(requestInfo,
WSCalculationConstant.mGram_Consumer_NewDemand, tenantId);
HashMap<String, String> gpwscMap = util.getLocalizationMessage(requestInfo, tenantId, tenantId);
Expand Down Expand Up @@ -362,7 +371,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
producer.push(config.getSmsNotifTopic(), smsRequest);
}
}

log.info("Time taken for notification : "+(System.currentTimeMillis()-starttimeforNotification)/1000+ " Secondss");
});
/* if (isSendMessage && failedConnectionNos.size() > 0) {
List<ActionItem> actionItems = new ArrayList<>();
Expand Down Expand Up @@ -577,10 +586,15 @@ public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {
String billingPeriod = bulkDemand.getBillingPeriod();
if (StringUtils.isEmpty(billingPeriod))
throw new CustomException("BILLING_PERIOD_PARSING_ISSUE", "Billing Period can not be empty!!");
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic for tenantid:"
+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" Start Time:"+System.currentTimeMillis() );
Long starTime = System.currentTimeMillis();
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic" );
generateDemandAndSendnotification(bulkDemand.getRequestInfo(), bulkDemand.getTenantId(), billingPeriod, billingMasterData,
isSendMessage, isManual);

long endTime=System.currentTimeMillis();
long diff = endTime-starTime;
log.info("time takenn to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" seconds");
}
@KafkaListener(topics = {
"${egov.update.demand.add.penalty}" })
Expand All @@ -603,9 +617,44 @@ public void generateDemandInBulkListner(HashMap<Object, Object> messageData) {
billingCycle= (String) genarateDemandData.get("billingCycle");
isSendMessage= (boolean) genarateDemandData.get("isSendMessage");
tenantId=(String) genarateDemandData.get("tenantId");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/MM/yyyy");


log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId);
LocalDate fromDate = LocalDate.parse(billingCycle.split("-")[0].trim(), formatter);
LocalDate toDate = LocalDate.parse(billingCycle.split("-")[1].trim(), formatter);

Long dayStartTime = LocalDateTime
.of(fromDate.getYear(), fromDate.getMonth(), fromDate.getDayOfMonth(), 0, 0, 0)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Long dayEndTime = LocalDateTime
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Calendar previousFromDate = Calendar.getInstance();
Calendar previousToDate = Calendar.getInstance();

previousFromDate.setTimeInMillis(dayStartTime);
previousToDate.setTimeInMillis(dayEndTime);

previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
//log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
Set<String> consumerCodes = new LinkedHashSet<String>();
consumerCodes.add(calculationReq.getCalculationCriteria().get(0).getConnectionNo());
if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(),
previousToDate.getTimeInMillis(), consumerCodes)
&& !waterCalculatorDao.isConnectionExists(tenantId, previousFromDate.getTimeInMillis(),
previousToDate.getTimeInMillis(), consumerCodes)) {
log.warn("this connection doen't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo());
} else {
Long starttime = System.currentTimeMillis();
generateDemandInBulk(calculationReq, billingCycle, masterMap, isSendMessage, tenantId);
log.info("GOt call inn ws-gennerate-demand-bulk topic end time:" + System.currentTimeMillis());
Long endtime = System.currentTimeMillis();
long diff = endtime - starttime;
log.info("Time taken to process request for :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo() + " is :" + diff / 1000 + " secs");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ spring.kafka.consumer.group-id=egov-ws-calc-services
spring.kafka.consumer.properties.spring.json.use.type.headers=false
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.linger.ms=10
spring.kafka.producer.batch.size=32768
spring.kafka.producer.buffer.memory=33554432
spring.kafka.consumer.session.timeout.ms=30000
spring.kafka.consumer.heartbeat.interval.ms=10000
spring.kafka.consumer.max.poll.interval.ms=600000

$KAFKA TOPIC DETAILS
egov.watercalculatorservice.createdemand.topic=ws-generate-demand
Expand Down

0 comments on commit 0f81711

Please sign in to comment.