Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hometask 3 - Java concurrency made simpler #50

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ In pull request mark your mentor (@mentorName) so it can review it.
To complite task one you need to fix DataProcessorTest unit tests.
All methods in DataProcessor with "7" in a name should be implemented using Java 7, when other should be done with Java 8 streaming api.

## Hometask 3
The AccidentDataProcessor sequentially reads, enriches and writes accident data to a file. Serial processing takes much time. Use multithreading
to run these tasks in parallel and communicate to each other to reduce time.

Below task is optional if somebody is interested to play with more.
For more complicated scenario change the PoliceForceExternalDataService.HALT_FOR_TO_TEST_THREAD_HANDLING to 20 min for example to
simulate the scenario that enrichment from 3rd party service may get stuck.
Currently this service method is being used synchronously. Try to make this call asynchronous.


## Hometask 4
In the code what you did on Hometask 3 please create tests to achieve at least 50% test coverage in your new classes. Use IDE plugin to measure. At least 1 integration test is required. Try to use BDD approach.

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@
<version>1.12.1.RELEASE</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.5</version>
</dependency>

</dependencies>


Expand Down
48 changes: 48 additions & 0 deletions src/main/java/com/epam/concurrency/task/AccidentDataEnricher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.epam.concurrency.task;

import com.epam.data.RoadAccident;
import com.epam.dataservice.PoliceForceExternalDataService;

import java.util.ArrayList;
import java.util.List;

/**
* Created by Tanmoy on 6/16/2016.
*/
public class AccidentDataEnricher {

private PoliceForceExternalDataService policeForceService = new PoliceForceExternalDataService();
private static final int POLICE_FORCE_SERVICE_CALL_TIMEOUT_MS = 1000 * 30;

public List<RoadAccidentDetails> enrichRoadAccidentData(List<RoadAccident> roadAccidents){
List<RoadAccidentDetails> roadAccidentDetailsList = new ArrayList<>(roadAccidents.size());
for(RoadAccident roadAccident : roadAccidents){
roadAccidentDetailsList.add(enrichRoadAccidentDataItem(roadAccident));
}
Util.sleep();
return roadAccidentDetailsList;
}

public RoadAccidentDetails enrichRoadAccidentDataItem(RoadAccident roadAccident){
RoadAccidentDetails roadAccidentDetails = new RoadAccidentDetails(roadAccident);
enrichPoliceForceContactSynchronously(roadAccidentDetails);
/**
* above call might get blocked causing the application to get stuck
*
* solve this problem by accessing the the PoliceForceExternalDataService asynchronously
* with a timeout of 30 S
*
* use method "enrichPoliceForceContactAsynchronously" instead
*/
return roadAccidentDetails;
}

private void enrichPoliceForceContactSynchronously(RoadAccidentDetails roadAccidentDetails){
String policeForceContact = policeForceService.getContactNo(roadAccidentDetails.getPoliceForce());
roadAccidentDetails.setPoliceForceContact(policeForceContact);
}

private void enrichPoliceForceContactAsynchronously(RoadAccidentDetails roadAccidentDetails){
//use POLICE_FORCE_SERVICE_CALL_TIMEOUT_MS
}
}
67 changes: 67 additions & 0 deletions src/main/java/com/epam/concurrency/task/AccidentDataProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.epam.concurrency.task;

import com.epam.data.RoadAccident;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
* Created by Tanmoy on 6/17/2016.
*/
public class AccidentDataProcessor {

private static final String FILE_PATH_1 = "src/main/resources/DfTRoadSafety_Accidents_2010.csv";
private static final String FILE_PATH_2 = "src/main/resources/DfTRoadSafety_Accidents_2011.csv";
private static final String FILE_PATH_3 = "src/main/resources/DfTRoadSafety_Accidents_2012.csv";
private static final String FILE_PATH_4 = "src/main/resources/DfTRoadSafety_Accidents_2013.csv";

private static final String OUTPUT_FILE_PATH = "target/DfTRoadSafety_Accidents_consolidated.csv";

private static final int DATA_PROCESSING_BATCH_SIZE = 10000;

private AccidentDataReader accidentDataReader = new AccidentDataReader();
private AccidentDataEnricher accidentDataEnricher = new AccidentDataEnricher();
private AccidentDataWriter accidentDataWriter = new AccidentDataWriter();

private List<String> fileQueue = new ArrayList<String>();


public void init(){
fileQueue.add(FILE_PATH_1);
fileQueue.add(FILE_PATH_2);
fileQueue.add(FILE_PATH_3);
fileQueue.add(FILE_PATH_4);

accidentDataWriter.init(OUTPUT_FILE_PATH);
}

public void process(){
for (String accidentDataFile : fileQueue){
accidentDataReader.init(DATA_PROCESSING_BATCH_SIZE, accidentDataFile);
processFile();
}
}

private void processFile(){
while (!accidentDataReader.hasFinished()){
processNextBatch();
}
}

public void processNextBatch(){
List<RoadAccident> roadAccidents = accidentDataReader.getNextBatch();
List<RoadAccidentDetails> roadAccidentDetailsList = accidentDataEnricher.enrichRoadAccidentData(roadAccidents);
accidentDataWriter.writeAccidentData(roadAccidentDetailsList);
}

public static void main(String[] args) {
AccidentDataProcessor dataProcessor = new AccidentDataProcessor();
long start = System.currentTimeMillis();
dataProcessor.init();
dataProcessor.process();
long end = System.currentTimeMillis();
System.out.println("Process finished in s : " + (end-start)/1000);
}

}
80 changes: 80 additions & 0 deletions src/main/java/com/epam/concurrency/task/AccidentDataReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.epam.concurrency.task;


import com.epam.data.RoadAccident;
import com.epam.dataservice.RoadAccidentParser;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class AccidentDataReader {

private Integer batchSize;
private String dataFileName;
private int batchCount;
private int recordCount;
private boolean hasFinished = false;
private RoadAccidentParser roadAccidentParser = new RoadAccidentParser();

private Iterator<CSVRecord> recordIterator;
private Logger log = LoggerFactory.getLogger(AccidentDataReader.class);


public void init(int batchSize, String dataFileName){
this.batchSize = batchSize;
this.dataFileName = dataFileName;
batchCount = 0;
recordCount = 0;
hasFinished = false;
prepareIterator();
}

public void reset(int batchSize, String dataFileName){
init(batchSize, dataFileName);
}

private void prepareIterator(){
try{
Reader reader = new FileReader(dataFileName);
recordIterator = new CSVParser(reader, CSVFormat.EXCEL.withHeader()).iterator();
}catch (Exception e){
log.error("Failed to prepare file iterator for file : {}", dataFileName, e);
throw new RuntimeException("Failed to prepare file iterator for file : " + dataFileName, e);
}
}

public List<RoadAccident> getNextBatch(){
List<RoadAccident> roadAccidentBatch = new ArrayList<RoadAccident>();
int recordCountInCurrBatch = 0;
RoadAccident roadAccidentItem = null;
while(recordCountInCurrBatch < batchSize && recordIterator.hasNext() ){
roadAccidentItem = roadAccidentParser.parseRecord(recordIterator.next());
if(roadAccidentItem != null){
roadAccidentBatch.add(roadAccidentItem);
recordCountInCurrBatch++;
}
}

if(recordCountInCurrBatch != 0){
++batchCount;
recordCount = recordCount + recordCountInCurrBatch;
}else {
hasFinished = true;
}
Util.sleep();
log.info("File : {} - Reading {} batch with {} records in current batch for file {}",dataFileName, batchCount, recordCountInCurrBatch);
return roadAccidentBatch;
}

public boolean hasFinished(){
return hasFinished;
}
}
94 changes: 94 additions & 0 deletions src/main/java/com/epam/concurrency/task/AccidentDataWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.epam.concurrency.task;

import com.epam.data.RoadAccident;
import com.epam.data.RoadAccidentBuilder;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;

/**
* Created by Tanmoy on 6/17/2016.
*/
public class AccidentDataWriter {

private String dataFileName;
private CSVFormat outputDataFormat = CSVFormat.EXCEL.withHeader();
private CSVPrinter csvFilePrinter;
private boolean isHeaderWritten;
private static final Object [] FILE_HEADER = {"Accident_Index","Longitude","Latitude","Police_Force","Accident_Severity","Number_of_Vehicles","Number_of_Casualties","Date","Time","Local_Authority_(District)","LightCondition","WeatherCondition","RoadSafeCondition","Police_Force_Contact"};
private Logger log = LoggerFactory.getLogger(AccidentDataWriter.class);

public void init(String dataFileName){
this.dataFileName = dataFileName;
try {
File dataFile = new File(dataFileName);
Files.deleteIfExists(dataFile.toPath());
Files.createFile(dataFile.toPath());
isHeaderWritten = false;
FileWriter fileWriter = new FileWriter(dataFileName);
csvFilePrinter = new CSVPrinter(fileWriter, outputDataFormat);
} catch (IOException e) {
log.error("Failed to create file writer for file {}", dataFileName, e);
throw new RuntimeException("Failed to create file writer for file " + dataFileName, e);
}
}

public void writeAccidentData(List<RoadAccidentDetails> accidentDetailsList){
try {
if (!isHeaderWritten){
csvFilePrinter.printRecord(FILE_HEADER);
isHeaderWritten = true;
}
for (RoadAccidentDetails accidentDetails : accidentDetailsList){
csvFilePrinter.printRecord(getCsvRecord(accidentDetails));
}
Util.sleep();
csvFilePrinter.flush();
} catch (IOException e) {
log.error("Failed to write accidentDetails to file {}", dataFileName);
throw new RuntimeException("Failed to write accidentDetails to file " + dataFileName, e);
}
}

private List<String> getCsvRecord(RoadAccidentDetails roadAccidentDetails){
List<String> record = new ArrayList<>();
record.add(roadAccidentDetails.getAccidentId());
record.add(String.valueOf(roadAccidentDetails.getLongitude()));
record.add(String.valueOf(roadAccidentDetails.getLatitude()));
record.add(roadAccidentDetails.getPoliceForce());
record.add(roadAccidentDetails.getAccidentSeverity());
record.add(String.valueOf(roadAccidentDetails.getNumberOfVehicles()));
record.add(String.valueOf(roadAccidentDetails.getNumberOfCasualties()));
record.add(roadAccidentDetails.getDate().toString());
record.add(roadAccidentDetails.getTime().toString());
record.add(roadAccidentDetails.getDistrictAuthority());
record.add(roadAccidentDetails.getLightConditions());
record.add(roadAccidentDetails.getWeatherConditions());
record.add(roadAccidentDetails.getRoadSurfaceConditions());
record.add(roadAccidentDetails.getPoliceForceContact());
return record;
}

public void close(){
try {
csvFilePrinter.close();
}catch (Exception e){
e.printStackTrace();
}
}

public static void main(String[] args) {
AccidentDataWriter accidentDataWriter = new AccidentDataWriter();
accidentDataWriter.init("target/output/DfTRoadSafety_Accidents_consolidated.csv");
RoadAccidentBuilder roadAccidentBuilder = new RoadAccidentBuilder("200901BS70001");
RoadAccident roadAccident = roadAccidentBuilder.withAccidentSeverity("1").build();

}
}
Loading