Skip to content

Commit

Permalink
use jackson to covert object to/from json
Browse files Browse the repository at this point in the history
Signed-off-by: Lida He <[email protected]>
  • Loading branch information
hldnova committed May 18, 2018
1 parent 27e0792 commit 7a01f4b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 43 deletions.
1 change: 0 additions & 1 deletion flink-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ dependencies {
compile "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
compile "org.apache.flink:flink-cep_2.11:${flinkVersion}"
compile "org.slf4j:slf4j-log4j12:1.7.14"
compile "com.google.code.gson:gson:2.3.+"
compile "joda-time:joda-time:2.9.+"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,30 @@
*/
package io.pravega.examples.flink.alert;

import io.pravega.shaded.com.google.gson.Gson;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime;

/**
* Object to process Apache access log
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class AccessLog {
private String ClientIP;

private String Status;
private long Timestamp;

private String Verb;

private String TimestampStr;

public AccessLog(){
Status=Verb=ClientIP="";
Timestamp=0L;
}

@JsonProperty("clientip")
public String getClientIP() {
return ClientIP;
}
Expand All @@ -34,6 +42,7 @@ public void setClientIP(String clientIP) {
ClientIP = clientIP;
}

@JsonProperty("response")
public String getStatus() {
return Status;
}
Expand All @@ -42,14 +51,16 @@ public void setStatus(String status) {
Status = status;
}

public long getTimestamp() {
return Timestamp;
}
@JsonProperty("@timestamp")
public String getTimestampStr() { return TimestampStr; }

public void setTimestampStr(String timestampStr) { TimestampStr = timestampStr; }

public void setTimestamp(long timestamp) {
this.Timestamp = timestamp;
public long getTimestampMillis() {
return new DateTime(getTimestampStr()).getMillis();
}

@JsonProperty("verb")
public String getVerb() {
return Verb;
}
Expand All @@ -74,7 +85,7 @@ public boolean equals(Object obj) {
AccessLog accessLog =(AccessLog)obj;
return accessLog.Verb.equals(Verb) &&
accessLog.Status.equals(Status) &&
accessLog.Timestamp==Timestamp &&
accessLog.TimestampStr.equals(TimestampStr) &&
accessLog.ClientIP.equals(ClientIP);
}

Expand All @@ -83,15 +94,19 @@ public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((Status == null) ? 0 : Status.hashCode());
result = prime * result + (int) (Timestamp ^ (Timestamp >>> 32));
result = prime * result + ((ClientIP == null) ? 0 : ClientIP.hashCode());
result = prime * result + ((TimestampStr == null) ? 0 : TimestampStr.hashCode());
result = prime * result + ((Verb == null) ? 0 : Verb.hashCode());
return result;
}

@Override
public String toString() {
Gson gson = new Gson();
return gson.toJson(this);
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return "AccessLog: Timestamp=" + getTimestampStr() +", ClientIP=" + getClientIP() + ", Verb=" + getVerb() + ", Status=" + getStatus();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
*/
package io.pravega.examples.flink.alert;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.pravega.connectors.flink.FlinkPravegaReader;
import io.pravega.connectors.flink.util.FlinkPravegaParams;
import io.pravega.connectors.flink.util.StreamId;
import io.pravega.shaded.com.google.gson.Gson;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
Expand All @@ -25,11 +22,11 @@
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,7 +91,7 @@ public void flatMap(AccessLog value, Collector<ResponseCount> out) throws Except
out.collect(new ResponseCount(value.getStatus(), 1));
}
}).filter((FilterFunction<ResponseCount>) count -> {
return !count.response.equals("500");
return count.response.equals("500");
}).keyBy("response")
.timeWindow(Time.seconds(Constants.ALERT_WINDOW), Time.seconds(Constants.ALERT_INTERVAL))
.sum("count");
Expand Down Expand Up @@ -134,31 +131,9 @@ public Alert select(Map<String, List<ResponseCount>> pattern) throws Exception {

//Parse the incoming streams & convert into Java PoJos
private static class ParseLogData implements MapFunction<String, AccessLog>{
public AccessLog map(String record) throws Exception {
// TODO: handle exceptions
Gson gson = new Gson();
AccessLog accessLog = new AccessLog();
JsonParser parser = new JsonParser();
JsonObject obj = parser.parse(record).getAsJsonObject();
if (obj.has("verb")) {
String verb = obj.get("verb").getAsString();
accessLog.setVerb(verb);
}
if (obj.has("response")) {
String response = obj.get("response").getAsString();
accessLog.setStatus(response);
}
if (obj.has("@timestamp")) {
String timestamp = obj.get("@timestamp").getAsString();

DateTime dateTime = new DateTime(timestamp);
accessLog.setTimestamp(dateTime.getMillis());
}
if (obj.has("clientip")) {
String client = obj.get("clientip").getAsString();
accessLog.setClientIP(client);
}
return accessLog;
public AccessLog map(String value) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(value, AccessLog.class);
}
}

Expand Down

0 comments on commit 7a01f4b

Please sign in to comment.