Skip to content

Commit

Permalink
example to generate alert from apache access log via logstash pravega…
Browse files Browse the repository at this point in the history
… output plugin

Signed-off-by: Lida He <[email protected]>

add example to process apache access log and generate high 500 response alert

logstash config and first cut of readme for alert sample

instruction to run high count alerter sample.

instruction to run the high count alerter sample

print output to stdout

Add flink references

update flink references

Steps to use wordCountWriter  instead of logstash
Signed-off-by: Lida He <[email protected]>

Add link to readme file for high error count alerter
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

add license to conf files
Signed-off-by: Lida He <[email protected]>

read access log from file instead of stdin
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read from file.
Signed-off-by: Lida He <[email protected]>

Update instruction to read access log from file
Signed-off-by: Lida He <[email protected]>

Update README.md
Signed-off-by: Lida He <[email protected]>

Update README.md

Signed-off-by: Lida He <[email protected]>
Signed-off-by: Lida He <[email protected]>
  • Loading branch information
hldnova committed May 14, 2018
1 parent 334b633 commit 502e2e6
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flink-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ This example demonstrates how to use the Pravega Flink Connectors to write data
from an external network stream into a Pravega stream and read the data from the Pravega stream.
See [Flink Word Count Sample](doc/flink-wordcount/README.md) for instructions.

## High Error Count Alert

This example demonstrates how to use the Pravega Flink connectors to read and
parse Apache access logs from logstash via the [logstash pravega output plugin](https://github.com/pravega/logstash-output-pravega),
and how to generate alert when error count is high within a time frame.
See [High Error Count Alert](doc/flink-high-error-count-alert/README.md) for instructions.
11 changes: 11 additions & 0 deletions flink-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ dependencies {
compile "io.pravega:pravega-connectors-flink_2.11:${connectorVersion}"
compile "org.apache.flink:flink-streaming-java_2.11:${flinkVersion}"
compile "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
compile "org.apache.flink:flink-cep_2.11:${flinkVersion}"
compile "org.slf4j:slf4j-log4j12:1.7.14"
compile "com.google.code.gson:gson:2.3.+"
compile "joda-time:joda-time:2.9.+"
}

shadowJar {
Expand All @@ -54,6 +57,13 @@ task scriptWordCountReader(type: CreateStartScripts) {
classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath
}

task scriptFlinkAlerter(type: CreateStartScripts) {
outputDir = file('build/scripts')
mainClassName = 'io.pravega.examples.flink.alert.HighCountAlerter'
applicationName = 'highCountAlerter'
classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath
}

distributions {
main {
baseName = archivesBaseName
Expand All @@ -67,6 +77,7 @@ distributions {
into('bin') {
from project.scriptWordCountWriter
from project.scriptWordCountReader
from project.scriptFlinkAlerter
}
}
}
Expand Down
133 changes: 133 additions & 0 deletions flink-examples/doc/flink-high-error-count-alert/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# High Count Alert #

The application reads apache access logs from a Pravega stream and once every 2 seconds
counts the number of 500 responses in the last 30 seconds, and generates
alert when the counts of 500 responses exceed 6.

## Prerequistes ##

A Docker image containing Pravega and Logstash had been prepared to simplify the demo. Skip ahead to the **Run in Docker Container** section in this document if you have docker environment handy.

Otherwise proceed to set up Logstash and Pravega

1. Logstash installed, see [Install logstash](https://www.elastic.co/guide/en/logstash/5.6/installing-logstash.html).
2. Pravega running, see [here](http://pravega.io/docs/latest/getting-started/) for instructions.

## Start Logstash with Pravega Output Plugin ##

On the Logstash host, download the plugin gem file from [Logstash Pravega output plugin](https://github.com/pravega/logstash-output-pravega/releases), for example, `logstash-output-pravega-0.2.0.gem`.

Install the plugin, assuming Logstash is installed at `/usr/share/logstash/`
```
$ /usr/share/logstash/bin/logstash-plugin install logstash-output-pravega-0.2.0.gem
```

Copy the contents under flink-examples/doc/flink-high-error-count-alert/filters/ to the Logstash host, e.g., in directory ~/pravega.
update **pravega_endpoint** in ~/pravega/90-pravega-output.conf

```
output {
pravega {
pravega_endpoint => "tcp://127.0.0.1:9090" <- update to point to your Pravega controller
stream_name => "apacheaccess"
scope => "myscope"
}
}
```

Start logstash, assuming it is installed at /usr/share/logstash/bin.
Note that sometimes it may take a minute or two for logstash to start. For troubleshooting, the logstash log files are
normally at /var/log/logstash. To restart, type Ctrl-C, then re-run the command.

```
$ sudo /usr/share/logstash/bin -f ~/pravega
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
```

Normally Logstash is configured to receive data from remote log shippers, such as filebeat. For simplicity in this demo
Logstash is configured read data from /tmp/access.log.

## Run in Docker Container ##

Create a file at /tmp/access.log
```
$ touch /tmp/access.log
```

Run script below to start container from prebuilt image. Adjust parameters to your need.
```
#!/bin/sh
set -u
PRAVEGA_SCOPE=myscope
PRAVEGA_STREAM=apacheaccess
CONTAINER_NAME=pravega
IMAGE_NAME=emccorp/pravega-demo
docker run -d --name $CONTAINER_NAME \
-p 9090:9090 \
-p 9091:9091 \
-v /tmp/access.log:/opt/data/access.log \
-v /tmp/logs/:/var/log/pravega/ \
-e PRAVEGA_ENDPOINT=${PRAVEGA_ENDPOINT} \
-e PRAVEGA_SCOPE=${PRAVEGA_SCOPE} \
-e PRAVEGA_STREAM=${PRAVEGA_STREAM} \
${IMAGE_NAME}
```

More details can be found on github [pravega docker](https://github.com/hldnova/pravega-docker) and on dockerhub [pravega docker image](https://hub.docker.com/r/emccorp/pravega-demo/)

## Run HighCountAlerter ##

Run the alerter. Adjust the controller and scope/stream if necessary.
```
$ cd flink-examples/build/install/pravega-flink-examples
$ bin/highCountAlerter [--controller tcp://127.0.0.1:9090] [--stream myscope/apacheaccess]
```

## Input Data ##

Add access logs to /tmp/access.log, e.g., by running command below every one or two seconds.
```
echo '10.1.1.11 - peter [19/Mar/2018:02:24:01 -0400] "PUT /mapping/ HTTP/1.1" 500 182 "http://example.com/myapp" "python-client"' >> /tmp/accesslog
```

Logstash will push the data to Pravega in json string, e.g.,
```
{
"request" => "/mapping/",
"agent" => "\"python-client\"",
"auth" => "peter",
"ident" => "-",
"verb" => "PUT",
"message" => "10.1.1.11 - peter [19/Mar/2018:02:24:01 -0400] \"PUT /mapping/ HTTP/1.1\" 500 182 \"http://example.com/myapp\" \"python-client\"",
"referrer" => "\"http://example.com/myapp\"",
"@timestamp" => 2018-03-19T06:24:01.000Z,
"response" => "500",
"bytes" => "182",
"clientip" => "10.1.1.11",
"@version" => "1",
"host" => "lglca061.lss.emc.com",
"httpversion" => "1.1"
}
```

## View Alert ##
In the HighCountAlerter window, you should see output like the following. Once the 500 response counts reach 6 or above, it
should print **High 500 responses** alerts.
```
3> Response count: 500 : 1
3> Response count: 500 : 2
3> Response count: 500 : 4
3> Response count: 500 : 6
2> High 500 responses: 500 : 6
3> Response count: 500 : 8
3> High 500 responses: 500 : 8
3> Response count: 500 : 8
2> High 500 responses: 500 : 8
3> Response count: 500 : 7
3> High 500 responses: 500 : 7
3> Response count: 500 : 5
3> Response count: 500 : 3
3> Response count: 500 : 1
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

input {
file {
path => "/tmp/access.log"
start_position => beginning
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate {
remove_field => [ "timestamp" ]
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

output {
pravega {
pravega_endpoint => "tcp://127.0.0.1:9090"
stream_name => "apacheaccess"
scope => "myscope"
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#
# Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

output {
stdout { codec => rubydebug }
}

22 changes: 22 additions & 0 deletions flink-examples/src/main/dist/bin/create-stream.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
#
# sample script to create scope and stream using Pravega REST API
#
host=localhost
port=9091
scope=myscope
stream=apacheaccess
curl -v -H "Content-Type: application/json" $host:${port}/v1/scopes
-d '{
"scopeName": "'${scope}'"
}'

curl -v -H "Content-Type: application/json" $host:${port}/v1/scopes/${scope}/streams \
-d '{
"streamName": "'${stream}'",
"scopeName": "'${scope}'",
"scalingPolicy":{
"type": "FIXED_NUM_SEGMENTS",
"minSegments": 1
}
}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.examples.flink.alert;

import io.pravega.shaded.com.google.gson.Gson;

/**
* Object to process Apache access log
*/
public class AccessLog {
private String ClientIP;
private String Status;
private long Timestamp;
private String Verb;

public AccessLog(){
Status=Verb=ClientIP="";
Timestamp=0L;
}

public String getClientIP() {
return ClientIP;
}

public void setClientIP(String clientIP) {
ClientIP = clientIP;
}

public String getStatus() {
return Status;
}

public void setStatus(String status) {
Status = status;
}

public long getTimestamp() {
return Timestamp;
}

public void setTimestamp(long timestamp) {
this.Timestamp = timestamp;
}

public String getVerb() {
return Verb;
}

public void setVerb(String verb) {
Verb = verb;
}

/**
* The events in the DataStream to which you want to apply pattern matching must
* implement proper equals() and hashCode() methods because these are used for
* comparing and matching events.
*/
@Override
public boolean equals(Object obj) {
if(this==obj){
return true;
}
if(!(obj instanceof AccessLog)){
return false;
}
AccessLog accessLog =(AccessLog)obj;
return accessLog.Verb.equals(Verb) &&
accessLog.Status.equals(Status) &&
accessLog.Timestamp==Timestamp &&
accessLog.ClientIP.equals(ClientIP);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((Status == null) ? 0 : Status.hashCode());
result = prime * result + (int) (Timestamp ^ (Timestamp >>> 32));
result = prime * result + ((ClientIP == null) ? 0 : ClientIP.hashCode());
result = prime * result + ((Verb == null) ? 0 : Verb.hashCode());
return result;
}

@Override
public String toString() {
Gson gson = new Gson();
return gson.toJson(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.examples.flink.alert;

/**
* Defines a handful of constants shared by classes in this package.
*
*/
public class Constants {
protected static final String STREAM_PARAM = "stream";
protected static final String DEFAULT_STREAM = "myscope/apacheaccess";
protected static final String CONTROLLER_PARAM = "controller";
protected static final String DEFAULT_CONTROLLER = "tcp://127.0.0.1:9090";
protected static final Integer ALERT_THRESHOLD = 6;
protected static final Integer ALERT_WINDOW = 30;
protected static final Integer ALERT_INTERVAL = 2;
}
Loading

0 comments on commit 502e2e6

Please sign in to comment.