diff --git a/mojo-flink/README.md b/mojo-flink/README.md index 861000c..0628244 100644 --- a/mojo-flink/README.md +++ b/mojo-flink/README.md @@ -4,23 +4,20 @@ Deploy the Driverless AI MOJO Scoring Pipeline to Apache Flink by using the MOJO2 Java Runtime API and a custom Flink RichMapFunction. This will be a Cloudera Integration point for Cloudera Data Flow (CDF), particulary Cloudera Streaming Analytics(CSA). CSA is powered by Apache Flink. +If you need to send the MOJO predictions to a Kafka topic, then read the following document: [Send Driverless AI MOJO Predictions to Kafka using FlinkKafkaProducer](./daimojo-flink-kafka.md) + ## Video Walkthrough The following link is a YouTube video that shows how to deploy the Driverless AI MOJO to Flink to do batch and real-time scoring on Hydraulic System data to classify for Hydraulic Cooling Condition: [Flink Custom RichMapFunction for Running the Driverless AI MOJO in Flink Data Pipeline](https://youtu.be/RU6Q4UrhCEs) ## Prerequisites -- Driverless AI Environment (Tested with Driverless AI 1.8.7.1, MOJO Scoring Pipeline 2.4.2) +- Driverless AI Environment (Tested with Driverless AI 1.9.0, MOJO Scoring Pipeline 2.4.8) - Launch Ubuntu 18.04 Linux EC2 instance - Instance Type: t2.2xlarge - - Storage: 256GB + - Storage: 128GB - Open custom TCP port 8081 and source on 0.0.0.0/0 -- Download the Driverless AI Deployment Repo to your local machine since we will be using the Flink Data Pipeline java programs that come with mojo-flink/ folder. - -~~~bash -git clone -b mojo-flink https://github.com/james94/dai-deployment-examples/ -~~~ ## Task 1: Set Up Environment @@ -39,17 +36,17 @@ chmod 400 $HOME/.ssh/{private-key-filename}.pem # For Mac OS X, set permanent environment variables tee -a $HOME/.bash_profile << EOF # Set EC2 Public DNS -export DAI_MOJO_FLINK_INSTANCE={EC2 Public DNS}.compute.amazon.com +export DAI_MOJO_CDF_INSTANCE={EC2 Public DNS}.compute.amazon.com # Set EC2 Pem Key -export DAI_MOJO_FLINK_PEM=$HOME/.ssh/{private-key-filename}.pem +export DAI_MOJO_CDF_PEM=$HOME/.ssh/{private-key-filename}.pem EOF # For Linux, set permanent environment variables tee -a $HOME/.profile << EOF # Set EC2 Public DNS -export DAI_MOJO_FLINK_INSTANCE={EC2 Public DNS}.compute.amazon.com +export DAI_MOJO_CDF_INSTANCE={EC2 Public DNS}.compute.amazon.com # Set EC2 Pem Key -export DAI_MOJO_FLINK_PEM=$HOME/.ssh/{private-key-filename}.pem +export DAI_MOJO_CDF_PEM=$HOME/.ssh/{private-key-filename}.pem EOF source $HOME/.bash_profile @@ -59,7 +56,7 @@ source $HOME/.bash_profile ~~~bash # Connect to EC2 instance using SSH -ssh -i $DAI_MOJO_FLINK_PEM ubuntu@$DAI_MOJO_FLINK_INSTANCE +ssh -i $DAI_MOJO_CDF_PEM ubuntu@$DAI_MOJO_CDF_INSTANCE ~~~ ### Create Environment Directory Structure @@ -68,12 +65,7 @@ ssh -i $DAI_MOJO_FLINK_PEM ubuntu@$DAI_MOJO_FLINK_INSTANCE ~~~bash # Create directory structure for DAI MOJO Flink Projects - -# Create directory where the mojo-pipeline/ folder will be stored -mkdir $HOME/daimojo-flink/ - -# Create input directory used for batch scoring or real-time scoring -mkdir -p $HOME/daimojo-flink/testData/{test-batch-data,test-real-time-data} +mkdir -p $HOME/dai-model-deployment/testData/{test-batch-data,test-real-time-data} ~~~ ### Set Up Driverless AI MOJO Requirements in EC2 @@ -99,14 +91,14 @@ https://raw.githubusercontent.com/james94/driverlessai-recipes/master/data/hydra ~~~bash # Move Driverless AI MOJO Scoring Pipeline to EC2 instance -scp -i $DAI_MOJO_FLINK_PEM $HOME/Downloads/mojo.zip ubuntu@$DAI_MOJO_FLINK_INSTANCE:/home/ubuntu/daimojo-flink/ +scp -i $DAI_PEM $HOME/Downloads/mojo.zip ubuntu@$DAI_MOJO_CEM_INSTANCE:/home/ubuntu/dai-model-deployment/ ~~~ - 2b. Unzip **mojo.zip**. ~~~bash +cd $HOME/dai-model-deployment/ sudo apt -y install unzip -cd $HOME/daimojo-flink/ unzip mojo.zip ~~~ @@ -115,31 +107,27 @@ unzip mojo.zip - 3a. Download and install Anaconda. ~~~bash -# Download Anaconda -wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh - -# Install Anaconda -bash Anaconda3-2020.02-Linux-x86_64.sh +# Download and install Anaconda +wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh -O $HOME/anaconda.sh +bash $HOME/anaconda.sh +source ~/.bashrc ~~~ - 3b. Create **model-deployment** virtual environment and install the **required packages** ~~~bash +# create virtual environment and install openjdk and maven conda create -y -n model-deployment python=3.6 conda activate model-deployment - -# Install Java conda install -y -c conda-forge openjdk=8.0.192 - -# Install Maven conda install -y -c conda-forge maven ~~~ -4\. Set the **Driverless AI License Key** as a **temporary environment variable** +4\. Set **temporary environment variable** for **Driverless AI License File** ~~~bash -# Set Driverless AI License Key -export DRIVERLESS_AI_LICENSE_KEY="{license-key}" +scp -i $DAI_MOJO_CDF_PEM $HOME/Downloads/license.sig ubuntu@$DAI_MOJO_CDF_INSTANCE:/home/ubuntu/ +export DRIVERLESS_AI_LICENSE_FILE="/home/ubuntu/license.sig" ~~~ ### Prepare Hydraulic Test Data For Mojo Flink Scoring @@ -181,17 +169,15 @@ rm -rf example.csv ~~~bash cd $HOME -# Download Flink -wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz - -# Extract Flink tgz -tar -xvf flink-1.11.1-bin-scala_2.12.tgz +# Download Flink and then extract Flink tgz +wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz +tar xzf flink-1.10.0-bin-scala_2.12.tgz ~~~ 2\. Start the Local Flink Cluster ~~~bash -cd $HOME/flink-1.11.1 +cd $HOME/flink-1.10.0 # Start Flink ./bin/start-cluster.sh @@ -200,7 +186,7 @@ cd $HOME/flink-1.11.1 ./bin/stop-cluster.sh ~~~ -3\. Access the Flink UI: http://localhost:8081/#/overview +3\. Access the Flink UI: http://${EC2_PUBLIC_DNS}:8081/#/overview ![Flink UI Overview](./images/flink-ui-overview.jpg) @@ -210,7 +196,7 @@ cd $HOME/flink-1.11.1 ~~~bash cd $HOME -git clone -b mojo-flink https://github.com/james94/dai-deployment-examples/ +git clone https://github.com/h2oai/dai-deployment-examples ~~~ 2\. Compile the Java code for **Flink MOJO ML Data Pipeline** jobs into a **JAR package** @@ -229,7 +215,7 @@ For Driverless AI MOJO batch scoring in Flink, we will run the following Flink M 1\. Run the following command to submit the **Flink MOJO Batch Scoring Job** to the Flink Cluster JobManager for execution: ~~~bash -$HOME/flink-1.11.1/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.BatchPredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.11.1.jar +$HOME/flink-1.10.0/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.BatchPredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.10.0.jar ~~~ 2\. Once the Flink job finishes running, it will transition to the Completed Job List: @@ -255,7 +241,7 @@ For Driverless AI MOJO batch scoring in Flink, we will run the following Flink M 1\. Run the following command to submit the Flink MOJO Stream Scoring Job for execution: ~~~bash -$HOME/flink-1.11.1/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.RealTimePredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.11.1.jar +$HOME/flink-1.10.0/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.RealTimePredHydCoolCond $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.10.0.jar ~~~ 2\. Once the Flink job finishes running, it will transition to the Completed Job List: diff --git a/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/BatchPredHydCoolCond.java b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/BatchPredHydCoolCond.java index ade8292..9c278e9 100644 --- a/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/BatchPredHydCoolCond.java +++ b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/BatchPredHydCoolCond.java @@ -29,11 +29,11 @@ public class BatchPredHydCoolCond { public static void main(String[] args) throws IOException, LicenseException, JobExecutionException { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - File pathToDaiMojo = new File(homePath + "/daimojo-flink/mojo-pipeline/pipeline.mojo"); + File pathToDaiMojo = new File(homePath + "/dai-model-deployment/mojo-pipeline/pipeline.mojo"); DataSet hydPredHeader = getPredHeader(pathToDaiMojo, env, "Get Pred Header via DAI MOJO"); - String pathToHydraulicData = homePath + "/daimojo-flink/testData/test-batch-data/example.csv"; + String pathToHydraulicData = homePath + "/dai-model-deployment/testData/test-batch-data/example.csv"; DataSet hydraulic = getBatchData(env, pathToHydraulicData, "Get Batch Data"); DataSet hydHeadFiltered = filterOutHeader(true, hydraulic, "Filter Out Input Header"); diff --git a/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCond.java b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCond.java index 5671e59..9c3b9ec 100644 --- a/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCond.java +++ b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCond.java @@ -26,11 +26,11 @@ public class RealTimePredHydCoolCond { public static void main(String[] args) throws IOException, LicenseException, JobExecutionException { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - File pathToDaiMojo = new File(homePath + "/daimojo-flink/mojo-pipeline/pipeline.mojo"); + File pathToDaiMojo = new File(homePath + "/dai-model-deployment/mojo-pipeline/pipeline.mojo"); DataStream hydPredHeader = getPredHeader(pathToDaiMojo, env, "Get Pred Header via DAI MOJO"); - String pathToHydraulicData = homePath + "/daimojo-flink/testData/test-real-time-data/"; + String pathToHydraulicData = homePath + "/dai-model-deployment/testData/test-real-time-data/"; DataStreamhydraulic = getRealTimeData(env, pathToHydraulicData, "Get Real-Time Data"); DataStream predHydraulic = predictRealTimeData(hydraulic, pathToDaiMojo, "Run DAI Mojo Real-Time Scoring"); diff --git a/mojo-flink/images/dai-mojo-flink-batch-job-etl-pipeline.jpg b/mojo-flink/images/dai-mojo-flink-batch-job-etl-pipeline.jpg index e606e9f..ac49b8a 100644 Binary files a/mojo-flink/images/dai-mojo-flink-batch-job-etl-pipeline.jpg and b/mojo-flink/images/dai-mojo-flink-batch-job-etl-pipeline.jpg differ diff --git a/mojo-flink/images/dai-mojo-flink-batch-job-taskmanager-log.jpg b/mojo-flink/images/dai-mojo-flink-batch-job-taskmanager-log.jpg index 58a75ee..d7b3bf5 100644 Binary files a/mojo-flink/images/dai-mojo-flink-batch-job-taskmanager-log.jpg and b/mojo-flink/images/dai-mojo-flink-batch-job-taskmanager-log.jpg differ diff --git a/mojo-flink/images/dai-mojo-flink-batch-scores.jpg b/mojo-flink/images/dai-mojo-flink-batch-scores.jpg index 8d5f931..c31c70c 100644 Binary files a/mojo-flink/images/dai-mojo-flink-batch-scores.jpg and b/mojo-flink/images/dai-mojo-flink-batch-scores.jpg differ diff --git a/mojo-flink/images/dai-mojo-flink-stream-job-data-pipeline.jpg b/mojo-flink/images/dai-mojo-flink-stream-job-data-pipeline.jpg index 40edbfb..fb4bfae 100644 Binary files a/mojo-flink/images/dai-mojo-flink-stream-job-data-pipeline.jpg and b/mojo-flink/images/dai-mojo-flink-stream-job-data-pipeline.jpg differ diff --git a/mojo-flink/images/dai-mojo-flink-stream-scores.jpg b/mojo-flink/images/dai-mojo-flink-stream-scores.jpg index ea62222..aefcb51 100644 Binary files a/mojo-flink/images/dai-mojo-flink-stream-scores.jpg and b/mojo-flink/images/dai-mojo-flink-stream-scores.jpg differ diff --git a/mojo-flink/images/finished-dai-mojo-flink-batch-job.jpg b/mojo-flink/images/finished-dai-mojo-flink-batch-job.jpg index 13eb437..1a453d2 100644 Binary files a/mojo-flink/images/finished-dai-mojo-flink-batch-job.jpg and b/mojo-flink/images/finished-dai-mojo-flink-batch-job.jpg differ diff --git a/mojo-flink/images/finished-dai-mojo-flink-stream-job.jpg b/mojo-flink/images/finished-dai-mojo-flink-stream-job.jpg index 883ac2f..24e09a2 100644 Binary files a/mojo-flink/images/finished-dai-mojo-flink-stream-job.jpg and b/mojo-flink/images/finished-dai-mojo-flink-stream-job.jpg differ