Skip to content

Commit

Permalink
Updated README instructions to refer Flink 1.10.0 for better compatib…
Browse files Browse the repository at this point in the history
…ility with CSA

Updated the README instructions to refer to Flink 1.10.0 for better compatibility with Cloudera Streaming Analytics (CSA).
Updated some of the code in Flink Batch and Real-Time data pipeline for the directory path it pulls input data from.
Also updated the screenshots in the README, so Flink's version number doesn't appear. The changes in UI between Flink 1.11.1
and 1.10.0 do not appear to be different.
  • Loading branch information
james94 committed Sep 26, 2020
1 parent 3d0fa6c commit 44ebc55
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 46 deletions.
70 changes: 28 additions & 42 deletions mojo-flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,20 @@

Deploy the Driverless AI MOJO Scoring Pipeline to Apache Flink by using the MOJO2 Java Runtime API and a custom Flink RichMapFunction. This will be a Cloudera Integration point for Cloudera Data Flow (CDF), particulary Cloudera Streaming Analytics(CSA). CSA is powered by Apache Flink.

If you need to send the MOJO predictions to a Kafka topic, then read the following document: [Send Driverless AI MOJO Predictions to Kafka using FlinkKafkaProducer](./daimojo-flink-kafka.md)

## Video Walkthrough

The following link is a YouTube video that shows how to deploy the Driverless AI MOJO to Flink to do batch and real-time scoring on Hydraulic System data to classify for Hydraulic Cooling Condition: [Flink Custom RichMapFunction for Running the Driverless AI MOJO in Flink Data Pipeline](https://youtu.be/RU6Q4UrhCEs)

## Prerequisites

- Driverless AI Environment (Tested with Driverless AI 1.8.7.1, MOJO Scoring Pipeline 2.4.2)
- Driverless AI Environment (Tested with Driverless AI 1.9.0, MOJO Scoring Pipeline 2.4.8)

- Launch Ubuntu 18.04 Linux EC2 instance
- Instance Type: t2.2xlarge
- Storage: 256GB
- Storage: 128GB
- Open custom TCP port 8081 and source on 0.0.0.0/0
- Download the Driverless AI Deployment Repo to your local machine since we will be using the Flink Data Pipeline java programs that come with mojo-flink/ folder.

~~~bash
git clone -b mojo-flink https://github.com/james94/dai-deployment-examples/
~~~

## Task 1: Set Up Environment

Expand All @@ -39,17 +36,17 @@ chmod 400 $HOME/.ssh/{private-key-filename}.pem
# For Mac OS X, set permanent environment variables
tee -a $HOME/.bash_profile << EOF
# Set EC2 Public DNS
export DAI_MOJO_FLINK_INSTANCE={EC2 Public DNS}.compute.amazon.com
export DAI_MOJO_CDF_INSTANCE={EC2 Public DNS}.compute.amazon.com
# Set EC2 Pem Key
export DAI_MOJO_FLINK_PEM=$HOME/.ssh/{private-key-filename}.pem
export DAI_MOJO_CDF_PEM=$HOME/.ssh/{private-key-filename}.pem
EOF

# For Linux, set permanent environment variables
tee -a $HOME/.profile << EOF
# Set EC2 Public DNS
export DAI_MOJO_FLINK_INSTANCE={EC2 Public DNS}.compute.amazon.com
export DAI_MOJO_CDF_INSTANCE={EC2 Public DNS}.compute.amazon.com
# Set EC2 Pem Key
export DAI_MOJO_FLINK_PEM=$HOME/.ssh/{private-key-filename}.pem
export DAI_MOJO_CDF_PEM=$HOME/.ssh/{private-key-filename}.pem
EOF

source $HOME/.bash_profile
Expand All @@ -59,7 +56,7 @@ source $HOME/.bash_profile

~~~bash
# Connect to EC2 instance using SSH
ssh -i $DAI_MOJO_FLINK_PEM ubuntu@$DAI_MOJO_FLINK_INSTANCE
ssh -i $DAI_MOJO_CDF_PEM ubuntu@$DAI_MOJO_CDF_INSTANCE
~~~

### Create Environment Directory Structure
Expand All @@ -68,12 +65,7 @@ ssh -i $DAI_MOJO_FLINK_PEM ubuntu@$DAI_MOJO_FLINK_INSTANCE

~~~bash
# Create directory structure for DAI MOJO Flink Projects

# Create directory where the mojo-pipeline/ folder will be stored
mkdir $HOME/daimojo-flink/

# Create input directory used for batch scoring or real-time scoring
mkdir -p $HOME/daimojo-flink/testData/{test-batch-data,test-real-time-data}
mkdir -p $HOME/dai-model-deployment/testData/{test-batch-data,test-real-time-data}
~~~

### Set Up Driverless AI MOJO Requirements in EC2
Expand All @@ -99,14 +91,14 @@ https://raw.githubusercontent.com/james94/driverlessai-recipes/master/data/hydra

~~~bash
# Move Driverless AI MOJO Scoring Pipeline to EC2 instance
scp -i $DAI_MOJO_FLINK_PEM $HOME/Downloads/mojo.zip ubuntu@$DAI_MOJO_FLINK_INSTANCE:/home/ubuntu/daimojo-flink/
scp -i $DAI_PEM $HOME/Downloads/mojo.zip ubuntu@$DAI_MOJO_CEM_INSTANCE:/home/ubuntu/dai-model-deployment/
~~~

- 2b. Unzip **mojo.zip**.

~~~bash
cd $HOME/dai-model-deployment/
sudo apt -y install unzip
cd $HOME/daimojo-flink/
unzip mojo.zip
~~~

Expand All @@ -115,31 +107,27 @@ unzip mojo.zip
- 3a. Download and install Anaconda.

~~~bash
# Download Anaconda
wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh

# Install Anaconda
bash Anaconda3-2020.02-Linux-x86_64.sh
# Download and install Anaconda
wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh -O $HOME/anaconda.sh
bash $HOME/anaconda.sh
source ~/.bashrc
~~~

- 3b. Create **model-deployment** virtual environment and install the **required packages**

~~~bash
# create virtual environment and install openjdk and maven
conda create -y -n model-deployment python=3.6
conda activate model-deployment

# Install Java
conda install -y -c conda-forge openjdk=8.0.192

# Install Maven
conda install -y -c conda-forge maven
~~~

4\. Set the **Driverless AI License Key** as a **temporary environment variable**
4\. Set **temporary environment variable** for **Driverless AI License File**

~~~bash
# Set Driverless AI License Key
export DRIVERLESS_AI_LICENSE_KEY="{license-key}"
scp -i $DAI_MOJO_CDF_PEM $HOME/Downloads/license.sig ubuntu@$DAI_MOJO_CDF_INSTANCE:/home/ubuntu/
export DRIVERLESS_AI_LICENSE_FILE="/home/ubuntu/license.sig"
~~~

### Prepare Hydraulic Test Data For Mojo Flink Scoring
Expand Down Expand Up @@ -181,17 +169,15 @@ rm -rf example.csv

~~~bash
cd $HOME
# Download Flink
wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz

# Extract Flink tgz
tar -xvf flink-1.11.1-bin-scala_2.12.tgz
# Download Flink and then extract Flink tgz
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz
tar xzf flink-1.10.0-bin-scala_2.12.tgz
~~~

2\. Start the Local Flink Cluster

~~~bash
cd $HOME/flink-1.11.1
cd $HOME/flink-1.10.0

# Start Flink
./bin/start-cluster.sh
Expand All @@ -200,7 +186,7 @@ cd $HOME/flink-1.11.1
./bin/stop-cluster.sh
~~~

3\. Access the Flink UI: http://localhost:8081/#/overview
3\. Access the Flink UI: http://${EC2_PUBLIC_DNS}:8081/#/overview

![Flink UI Overview](./images/flink-ui-overview.jpg)

Expand All @@ -210,7 +196,7 @@ cd $HOME/flink-1.11.1

~~~bash
cd $HOME
git clone -b mojo-flink https://github.com/james94/dai-deployment-examples/
git clone https://github.com/h2oai/dai-deployment-examples
~~~

2\. Compile the Java code for **Flink MOJO ML Data Pipeline** jobs into a **JAR package**
Expand All @@ -229,7 +215,7 @@ For Driverless AI MOJO batch scoring in Flink, we will run the following Flink M
1\. Run the following command to submit the **Flink MOJO Batch Scoring Job** to the Flink Cluster JobManager for execution:

~~~bash
$HOME/flink-1.11.1/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.BatchPredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.11.1.jar
$HOME/flink-1.10.0/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.BatchPredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.10.0.jar
~~~

2\. Once the Flink job finishes running, it will transition to the Completed Job List:
Expand All @@ -255,7 +241,7 @@ For Driverless AI MOJO batch scoring in Flink, we will run the following Flink M
1\. Run the following command to submit the Flink MOJO Stream Scoring Job for execution:

~~~bash
$HOME/flink-1.11.1/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.RealTimePredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.11.1.jar
$HOME/flink-1.10.0/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.RealTimePredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.10.0.jar
~~~

2\. Once the Flink job finishes running, it will transition to the Completed Job List:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public class BatchPredHydCoolCond {
public static void main(String[] args) throws IOException, LicenseException, JobExecutionException {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

File pathToDaiMojo = new File(homePath + "/daimojo-flink/mojo-pipeline/pipeline.mojo");
File pathToDaiMojo = new File(homePath + "/dai-model-deployment/mojo-pipeline/pipeline.mojo");

DataSet<String> hydPredHeader = getPredHeader(pathToDaiMojo, env, "Get Pred Header via DAI MOJO");

String pathToHydraulicData = homePath + "/daimojo-flink/testData/test-batch-data/example.csv";
String pathToHydraulicData = homePath + "/dai-model-deployment/testData/test-batch-data/example.csv";
DataSet<String> hydraulic = getBatchData(env, pathToHydraulicData, "Get Batch Data");

DataSet<String> hydHeadFiltered = filterOutHeader(true, hydraulic, "Filter Out Input Header");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public class RealTimePredHydCoolCond {
public static void main(String[] args) throws IOException, LicenseException, JobExecutionException {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

File pathToDaiMojo = new File(homePath + "/daimojo-flink/mojo-pipeline/pipeline.mojo");
File pathToDaiMojo = new File(homePath + "/dai-model-deployment/mojo-pipeline/pipeline.mojo");

DataStream<String> hydPredHeader = getPredHeader(pathToDaiMojo, env, "Get Pred Header via DAI MOJO");

String pathToHydraulicData = homePath + "/daimojo-flink/testData/test-real-time-data/";
String pathToHydraulicData = homePath + "/dai-model-deployment/testData/test-real-time-data/";
DataStream<String>hydraulic = getRealTimeData(env, pathToHydraulicData, "Get Real-Time Data");

DataStream<String> predHydraulic = predictRealTimeData(hydraulic, pathToDaiMojo, "Run DAI Mojo Real-Time Scoring");
Expand Down
Binary file modified mojo-flink/images/dai-mojo-flink-batch-job-etl-pipeline.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified mojo-flink/images/dai-mojo-flink-batch-scores.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified mojo-flink/images/dai-mojo-flink-stream-job-data-pipeline.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified mojo-flink/images/dai-mojo-flink-stream-scores.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified mojo-flink/images/finished-dai-mojo-flink-batch-job.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified mojo-flink/images/finished-dai-mojo-flink-stream-job.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 44ebc55

Please sign in to comment.