Skip to content

Commit

Permalink
Created Flink Data Pipeline that sends DAI MOJO Predictions to Kafka
Browse files Browse the repository at this point in the history
Also composed documentation on how to run the Flink Kafka data pipeline code for sending DAI MOJO SP predictions to
a Kafka topic. Added images too.

Also updated the pom.xml with universal Flink Kafka connector
  • Loading branch information
james94 committed Sep 26, 2020
1 parent d4c5158 commit 3d0fa6c
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 7 deletions.
18 changes: 11 additions & 7 deletions mojo-flink/daimojo-flink-data-pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@

<groupId>daimojo-flink-data-pipeline</groupId>
<artifactId>daimojo-flink-data-pipeline</artifactId>
<version>1.11.1</version>
<version>1.10.0</version>
<packaging>jar</packaging>

<name>Driverless AI MOJO Flink Job</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.1</flink.version>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
<daimojo.version>2.4.2</daimojo.version>
<daimojo.version>2.4.8</daimojo.version>
</properties>

<repositories>
Expand Down Expand Up @@ -55,8 +55,13 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Kafka connector for Flink 1.10.0 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand All @@ -78,7 +83,7 @@
<artifactId>opencsv</artifactId>
<version>[5.2]</version>
</dependency>
<!-- Driverless AI v1.8.7.1 MOJO Scoring Pipeline v2.4.2 dependencies -->
<!-- Driverless AI v1.9.0 MOJO Scoring Pipeline v2.4.8 dependencies -->
<!-- https://mvnrepository.com/artifact/ai.h2o/mojo2-runtime-api -->
<dependency>
<groupId>ai.h2o</groupId>
Expand All @@ -90,7 +95,7 @@
<groupId>ai.h2o</groupId>
<artifactId>mojo2-runtime-impl</artifactId>
<version>${daimojo.version}</version>
</dependency>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -126,7 +131,6 @@
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import com.opencsv.exceptions.CsvValidationException;

import ai.h2o.mojos.runtime.MojoPipeline;
import ai.h2o.mojos.runtime.api.MojoPipelineService;
import ai.h2o.mojos.runtime.frame.MojoFrameMeta;
import ai.h2o.mojos.runtime.lic.LicenseException;

@SuppressWarnings("serial")
public class AddKeyToPrediction extends RichMapFunction<String, String> {
private MojoPipeline model;
private final File pipelineMojoFilePath;
private MojoFrameMeta predMojoFrameMeta;
private String predHeaderKey;

public AddKeyToPrediction(File pipelineMojoPath) {
pipelineMojoFilePath = pipelineMojoPath;
}

/*
* Initialization method for the function.
* It is called one time before the actual working method (map) and suitable for one time setup
*/
@Override
public void open(Configuration parameters) throws IOException, LicenseException {
model = MojoPipelineService.loadPipeline(pipelineMojoFilePath);
predMojoFrameMeta = model.getOutputMeta();
predHeaderKey = Arrays.toString(predMojoFrameMeta.getColumnNames());
predHeaderKey = predHeaderKey.replaceAll("\\[", "").replaceAll("\\]", "").replaceAll("\\s+", "");
}

@Override
public String map(String inputData) throws CsvValidationException, IOException, LicenseException {
String addKeyToString = predHeaderKey + ":" + inputData;
System.out.println(addKeyToString);
return addKeyToString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond;

import java.io.File;
import java.util.Properties;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
* Flink Stream Scoring Job.
*
* Execute an H2O.ai Driverless AI MOJO Scoring Pipeline on a stream (real-time) of Hydraulic Sensor data
* to classify for Hydraulic Cooling Condition and push to a Kafka topic
*/
@SuppressWarnings("deprecation")
public class RealTimePredHydCoolCondKafka {

private static final String homePath = System.getProperty("user.home");
public static void main(String[] args) throws JobExecutionException {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameter = ParameterTool.fromArgs(args);

File pathToDaiMojo = new File(homePath + "/dai-model-deployment/mojo-pipeline/pipeline.mojo");
String pathToHydraulicData = homePath + "/dai-model-deployment/testData/test-real-time-data/";
DataStream<String>hydraulic = getRealTimeData(env, pathToHydraulicData, "Get Real-Time Data");

DataStream<String> predHydraulic = predictRealTimeData(hydraulic, pathToDaiMojo, "Run DAI Mojo Real-Time Scoring");

DataStream<String> streamKeyedScores = addKeyToPrediction(predHydraulic, pathToDaiMojo, "Add Key To Prediction for Kafka");

String sinkTopic = parameter.getRequired("topic");
String bootstrapServers = parameter.getRequired("bootstrap.servers");
String zookeeperConnect = parameter.getRequired("zookeeper.connect");

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("groupId", sinkTopic);
properties.setProperty("zookeeper.connect", zookeeperConnect);
properties.setProperty("parse.key", "true");
properties.setProperty("key.separator", ":");

FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(
sinkTopic, new SimpleStringSchema(), properties);

streamKeyedScores.addSink(flinkKafkaProducer).name("Push RT Scores to Kafka Topic");

executeFlinkStreamJob(env, "Deploy DAI Mojo SP within a Flink Kafka Streaming Data Pipeline");
}

/* Pulling in Real-Time data, meaning each file has an individual row of data */
private static DataStream<String> getRealTimeData(StreamExecutionEnvironment env, String pathToData, String operatorName) {
return env.readTextFile(pathToData).name(operatorName);
}

/* Perform Predictions on incoming hydraulic system data using Driverless AI MOJO Scoring Pipeline */
private static DataStream<String> predictRealTimeData(DataStream<String> dataStream, File pathToMojoScorer, String operatorName) {
DaiMojoTransform daiMojoTransform = new DaiMojoTransform(pathToMojoScorer);
return dataStream.map(daiMojoTransform).name(operatorName);
}

private static DataStream<String> addKeyToPrediction(DataStream<String> dataStream, File pathToMojoScorer, String operatorName) {
AddKeyToPrediction addKeyToData = new AddKeyToPrediction(pathToMojoScorer);
return dataStream.map(addKeyToData).name(operatorName);
}

/* Throw more specific exception because env.execute() expects throws or catch Exception */
private static void executeFlinkStreamJob(StreamExecutionEnvironment env, String jobName) throws JobExecutionException {
try {
env.execute(jobName);
} catch (Exception e) {
throw new JobExecutionException(null, jobName, e);
}
}
}
199 changes: 199 additions & 0 deletions mojo-flink/daimojo-flink-kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# Send Driverless AI MOJO Predictions to Kafka using FlinkKafkaProducer

## Cloudera Integration Point for CDF

Deploy the Driverless AI MOJO Scoring Pipeline to Apache Flink by using the MOJO2 Java Runtime API and a custom Flink RichMapFunction. Send the predictions performed by the MOJO to a Kafka topic using FlinkKafkaProducer. This will be a Cloudera Integration point for Cloudera Data Flow (CDF), particulary Cloudera Streaming Analytics(CSA). CSA is powered by Apache Flink.

## Video Walkthrough

The following link is a YouTube video that shows how to deploy the Driverless AI MOJO to Flink to perform real-time predictions on Hydraulic System data to clasify for Hydraulic Cooling Condition and then send those predictions to a Kafka topic: [Send Driverless AI MOJO Predictions to Kafka using FlinkKafkaProducer](https://youtu.be/hqd3ebEqkuk)

## Prerequisites

- Driverless AI Environment (Tested with Driverless AI 1.9.0, MOJO Scoring Pipeline 2.4.8)

- Launch Ubuntu 18.04 Linux EC2 instance
- Instance Type: t2.2xlarge
- Storage: 128GB
- Open traffic on all ports on 0.0.0.0/0

## Task 1: Set Up Environment

### Connect to EC2 from Local Machine

~~~bash
# Connect to EC2 instance using SSH
ssh -i $DAI_MOJO_CDF_PEM ubuntu@$DAI_MOJO_CDF_INSTANCE
~~~

### Create Environment Directory Structure

~~~bash
mkdir -p $HOME/dai-model-deployment/testData/{test-batch-data,test-real-time-data}
~~~

### Set Up Driverless AI MOJO Requirements in EC2

1\. Build a **Driverless AI Experiment**

2\. Send **mojo.zip** to EC2 instance

~~~bash
# Send DAI MOJO SP from local machine to EC2 instance
scp -i $DAI_MOJO_CDF_PEM $HOME/Downloads/mojo.zip ubuntu@$DAI_MOJO_CDF_INSTANCE:/home/ubuntu/dai-model-deployment/

# On EC2 instance, extract mojo.zip
cd $HOME/dai-model-deployment/
sudo apt -y install unzip
unzip mojo.zip
~~~

3\. Install **package dependencies** for MOJO2 Java Runtime

~~~bash
wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh -O $HOME/anaconda.sh
bash $HOME/anaconda.sh
source ~/.bashrc
conda create -y -n model-deployment python=3.6
conda activate model-deployment
conda install -y -c conda-forge openjdk=8.0.192
conda install -y -c conda-forge maven
~~~

4\. Set **temporary environment variable** for **Driverless AI License File**

~~~bash
scp -i $DAI_MOJO_CDF_PEM $HOME/Downloads/license.sig ubuntu@$DAI_MOJO_CDF_INSTANCE:/home/ubuntu/
export DRIVERLESS_AI_LICENSE_FILE="/home/ubuntu/license.sig"
~~~

### Prepare Hydraulic Test Data For Mojo Flink Scoring

1\. Setup batch data in test-batch-data folder:

~~~bash
cd /home/ubuntu/dai-model-deployment/mojo-pipeline/
cp example.csv /home/ubuntu/dai-model-deployment/testData/test-batch-data/
~~~

2\. Setup real-time data in test-real-time-data folder:

~~~bash
cd /home/ubuntu/dai-model-deployment/testData/test-real-time-data/
cp /home/ubuntu/dai-model-deployment/mojo-pipeline/example.csv .
echo -e "$(sed '1d' example.csv)\n" > example.csv
split -dl 1 --additional-suffix=.csv example.csv test_
rm -rf example.csv
~~~

### Set Up Kafka Local Cluster in EC2

1\. Download **Kafka**

~~~bash
cd $HOME
wget http://apache.mirrors.hoobly.com/kafka/2.4.1/kafka_2.12-2.4.1.tgz
tar xzf kafka_2.12-2.4.1.tgz
cd kafka_2.12-2.4.1
~~~

2\. Start the **Kafka Environment**

In the current terminal, start the Zookeeper service

~~~bash
# Start the ZooKeeper service
bin/zookeeper-server-start.sh config/zookeeper.properties
~~~

Open a new terminal to EC2, start the Kafka broker service

~~~bash
# Start the Kafka broker service
cd kafka_2.12-2.4.1

bin/kafka-server-start.sh config/server.properties
~~~

3\. Open a new terminal to EC2, create a **dai-mojo-predictions** Kafka topic to store your predictions

~~~bash
cd kafka_2.12-2.4.1

bin/kafka-topics.sh --create \
--zookeeper ${EC2_PUBLIC_DNS}:2181 \
--replication-factor 1 --partitions 1 \
--topic dai-mojo-predictions
~~~

### Set Up Flink Local Cluster in EC2

1\. Download **Flink**

~~~bash
cd $HOME
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz
tar xzf flink-1.10.0-bin-scala_2.12.tgz
~~~

2\. Add Driverless AI License File JVM System Property for all Flink processes to `flink-conf.yaml`

~~~bash
echo "# Java options to start the JVM of all Flink processes with" | tee -a /home/ubuntu/flink-1.10.0/conf/flink-conf.yaml
echo "env.java.opts=\"-Dai.h2o.mojos.runtime.license.file=/home/ubuntu/license.sig\"" | tee -a /home/ubuntu/flink-1.10.0/conf/flink-conf.yaml
~~~

3\. Start the Local Flink Cluster

~~~bash
cd $HOME/flink-1.10.0
./bin/start-cluster.sh
# ./bin/stop-cluster.sh
~~~

4\. Access the Flink UI: http://${EC2_PUBLIC_DNS}:8081/#/overview

### Compile Flink Kafka MOJO ML Data Pipeline Jobs

1\. Open a new terminal to EC2, download **Driverless AI Deployment Examples** Repo and compile the Java code for **Flink MOJO ML Data Pipeline** jobs into a **JAR package**

~~~bash
cd $HOME
git clone https://github.com/h2oai/dai-deployment-examples
cd $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline
mvn clean install
~~~

## Task 2: Deploy MOJO Scoring Pipeline to Flink to send to Kafka

### Real-Time Scoring

1\. Submit a Flink Kafka MOJO Stream Scoring Job for execution:

~~~bash
$HOME/flink-1.10.0/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.RealTimePredHydCoolCondKafka $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.10.0.jar --topic dai-mojo-predictions --bootstrap.servers ${EC2_PUBLIC_DNS}:9092 --zookeeper.connect ${EC2_PUBLIC_DNS}:2181
~~~

2\. Once the Flink job finishes running, it will transition to the Completed Job List:

![DAI MOJO Flink Kafka Stream Job Finished](./images/finished-mojo-flink-kafka-stream-job.jpg)

3\. Check out the Data Pipeline diagram of the Flink Job **Deploy DAI Mojo SP within a Flink Kafka Streaming Data Pipeline**:

![DAI MOJO Flink Kafka Stream Data Pipeline](./images/mojo-flink-kafka-stream-job-data-pipeline.jpg)

4\. Open a new terminal to EC2, view the Real-Time Scores by reading the predictions from the Kafka topic:

~~~bash
cd kafka_2.12-2.4.1

bin/kafka-console-consumer.sh --topic dai-mojo-predictions \
--bootstrap-server ${EC2_PUBLIC_DNS}:9092 \
--from-beginning
~~~

![Kafka Topic DAI MOJO Scores](./images/kafka-topic-mojo-scores.jpg)

## Conclusion

Congratulations, we just deployed a **Driverless AI MOJO Scoring Pipeline** within a **Flink Kafka Data Pipeline** to do **real-time scoring**. As a recap, we set up the environment in an AWS EC2 instance by setting up the Driverless AI MOJO Scoring Pipeline requirements, setting up Flink and Kafka on a single node and preparing some test data to be used for batch scoring or real-time scoring. With the environment setup, we used a custom Flink DaiMojoTransform RichMapFunction within a Flink Real-Time Data Pipeline to score our data and send those predictions to a **Kafka topic** using **FlinkKafkaProducer**. By reading the Kafka topic, we were able to see the results from real-time scoring.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added mojo-flink/images/kafka-topic-mojo-scores.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 3d0fa6c

Please sign in to comment.