diff --git a/mojo-flink/daimojo-flink-data-pipeline/pom.xml b/mojo-flink/daimojo-flink-data-pipeline/pom.xml index 2aa3cdd..bebf2b7 100644 --- a/mojo-flink/daimojo-flink-data-pipeline/pom.xml +++ b/mojo-flink/daimojo-flink-data-pipeline/pom.xml @@ -4,20 +4,20 @@ daimojo-flink-data-pipeline daimojo-flink-data-pipeline - 1.11.1 + 1.10.0 jar Driverless AI MOJO Flink Job UTF-8 - 1.11.1 + 1.10.0 1.8 2.12 ${java.version} ${java.version} 2.12.1 - 2.4.2 + 2.4.8 @@ -55,8 +55,13 @@ ${flink.version} provided + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + ${flink.version} + - org.apache.logging.log4j log4j-slf4j-impl @@ -78,7 +83,7 @@ opencsv [5.2] - + ai.h2o @@ -90,7 +95,7 @@ ai.h2o mojo2-runtime-impl ${daimojo.version} - + @@ -126,7 +131,6 @@ org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* - org.apache.logging.log4j:* diff --git a/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/AddKeyToPrediction.java b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/AddKeyToPrediction.java new file mode 100644 index 0000000..fda2ff0 --- /dev/null +++ b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/AddKeyToPrediction.java @@ -0,0 +1,46 @@ +package org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; + +import com.opencsv.exceptions.CsvValidationException; + +import ai.h2o.mojos.runtime.MojoPipeline; +import ai.h2o.mojos.runtime.api.MojoPipelineService; +import ai.h2o.mojos.runtime.frame.MojoFrameMeta; +import ai.h2o.mojos.runtime.lic.LicenseException; + +@SuppressWarnings("serial") +public class AddKeyToPrediction extends RichMapFunction { + private MojoPipeline model; + private final File pipelineMojoFilePath; + private MojoFrameMeta predMojoFrameMeta; + private String predHeaderKey; + + public AddKeyToPrediction(File pipelineMojoPath) { + pipelineMojoFilePath = pipelineMojoPath; + } + + /* + * Initialization method for the function. + * It is called one time before the actual working method (map) and suitable for one time setup + */ + @Override + public void open(Configuration parameters) throws IOException, LicenseException { + model = MojoPipelineService.loadPipeline(pipelineMojoFilePath); + predMojoFrameMeta = model.getOutputMeta(); + predHeaderKey = Arrays.toString(predMojoFrameMeta.getColumnNames()); + predHeaderKey = predHeaderKey.replaceAll("\\[", "").replaceAll("\\]", "").replaceAll("\\s+", ""); + } + + @Override + public String map(String inputData) throws CsvValidationException, IOException, LicenseException { + String addKeyToString = predHeaderKey + ":" + inputData; + System.out.println(addKeyToString); + return addKeyToString; + } +} diff --git a/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCondKafka.java b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCondKafka.java new file mode 100644 index 0000000..f2616d1 --- /dev/null +++ b/mojo-flink/daimojo-flink-data-pipeline/src/main/java/org/apache/h2o/daimojo/flink/datapipeline/ClassifyHydCoolCond/RealTimePredHydCoolCondKafka.java @@ -0,0 +1,78 @@ +package org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond; + +import java.io.File; +import java.util.Properties; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +/** + * Flink Stream Scoring Job. + * + * Execute an H2O.ai Driverless AI MOJO Scoring Pipeline on a stream (real-time) of Hydraulic Sensor data + * to classify for Hydraulic Cooling Condition and push to a Kafka topic + */ +@SuppressWarnings("deprecation") +public class RealTimePredHydCoolCondKafka { + + private static final String homePath = System.getProperty("user.home"); + public static void main(String[] args) throws JobExecutionException { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ParameterTool parameter = ParameterTool.fromArgs(args); + + File pathToDaiMojo = new File(homePath + "/dai-model-deployment/mojo-pipeline/pipeline.mojo"); + String pathToHydraulicData = homePath + "/dai-model-deployment/testData/test-real-time-data/"; + DataStreamhydraulic = getRealTimeData(env, pathToHydraulicData, "Get Real-Time Data"); + + DataStream predHydraulic = predictRealTimeData(hydraulic, pathToDaiMojo, "Run DAI Mojo Real-Time Scoring"); + + DataStream streamKeyedScores = addKeyToPrediction(predHydraulic, pathToDaiMojo, "Add Key To Prediction for Kafka"); + + String sinkTopic = parameter.getRequired("topic"); + String bootstrapServers = parameter.getRequired("bootstrap.servers"); + String zookeeperConnect = parameter.getRequired("zookeeper.connect"); + + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", bootstrapServers); + properties.setProperty("groupId", sinkTopic); + properties.setProperty("zookeeper.connect", zookeeperConnect); + properties.setProperty("parse.key", "true"); + properties.setProperty("key.separator", ":"); + + FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer( + sinkTopic, new SimpleStringSchema(), properties); + + streamKeyedScores.addSink(flinkKafkaProducer).name("Push RT Scores to Kafka Topic"); + + executeFlinkStreamJob(env, "Deploy DAI Mojo SP within a Flink Kafka Streaming Data Pipeline"); + } + + /* Pulling in Real-Time data, meaning each file has an individual row of data */ + private static DataStream getRealTimeData(StreamExecutionEnvironment env, String pathToData, String operatorName) { + return env.readTextFile(pathToData).name(operatorName); + } + + /* Perform Predictions on incoming hydraulic system data using Driverless AI MOJO Scoring Pipeline */ + private static DataStream predictRealTimeData(DataStream dataStream, File pathToMojoScorer, String operatorName) { + DaiMojoTransform daiMojoTransform = new DaiMojoTransform(pathToMojoScorer); + return dataStream.map(daiMojoTransform).name(operatorName); + } + + private static DataStream addKeyToPrediction(DataStream dataStream, File pathToMojoScorer, String operatorName) { + AddKeyToPrediction addKeyToData = new AddKeyToPrediction(pathToMojoScorer); + return dataStream.map(addKeyToData).name(operatorName); + } + + /* Throw more specific exception because env.execute() expects throws or catch Exception */ + private static void executeFlinkStreamJob(StreamExecutionEnvironment env, String jobName) throws JobExecutionException { + try { + env.execute(jobName); + } catch (Exception e) { + throw new JobExecutionException(null, jobName, e); + } + } +} diff --git a/mojo-flink/daimojo-flink-kafka.md b/mojo-flink/daimojo-flink-kafka.md new file mode 100644 index 0000000..323c8b2 --- /dev/null +++ b/mojo-flink/daimojo-flink-kafka.md @@ -0,0 +1,199 @@ +# Send Driverless AI MOJO Predictions to Kafka using FlinkKafkaProducer + +## Cloudera Integration Point for CDF + +Deploy the Driverless AI MOJO Scoring Pipeline to Apache Flink by using the MOJO2 Java Runtime API and a custom Flink RichMapFunction. Send the predictions performed by the MOJO to a Kafka topic using FlinkKafkaProducer. This will be a Cloudera Integration point for Cloudera Data Flow (CDF), particulary Cloudera Streaming Analytics(CSA). CSA is powered by Apache Flink. + +## Video Walkthrough + +The following link is a YouTube video that shows how to deploy the Driverless AI MOJO to Flink to perform real-time predictions on Hydraulic System data to clasify for Hydraulic Cooling Condition and then send those predictions to a Kafka topic: [Send Driverless AI MOJO Predictions to Kafka using FlinkKafkaProducer](https://youtu.be/hqd3ebEqkuk) + +## Prerequisites + +- Driverless AI Environment (Tested with Driverless AI 1.9.0, MOJO Scoring Pipeline 2.4.8) + +- Launch Ubuntu 18.04 Linux EC2 instance + - Instance Type: t2.2xlarge + - Storage: 128GB + - Open traffic on all ports on 0.0.0.0/0 + +## Task 1: Set Up Environment + +### Connect to EC2 from Local Machine + +~~~bash +# Connect to EC2 instance using SSH +ssh -i $DAI_MOJO_CDF_PEM ubuntu@$DAI_MOJO_CDF_INSTANCE +~~~ + +### Create Environment Directory Structure + +~~~bash +mkdir -p $HOME/dai-model-deployment/testData/{test-batch-data,test-real-time-data} +~~~ + +### Set Up Driverless AI MOJO Requirements in EC2 + +1\. Build a **Driverless AI Experiment** + +2\. Send **mojo.zip** to EC2 instance + +~~~bash +# Send DAI MOJO SP from local machine to EC2 instance +scp -i $DAI_MOJO_CDF_PEM $HOME/Downloads/mojo.zip ubuntu@$DAI_MOJO_CDF_INSTANCE:/home/ubuntu/dai-model-deployment/ + +# On EC2 instance, extract mojo.zip +cd $HOME/dai-model-deployment/ +sudo apt -y install unzip +unzip mojo.zip +~~~ + +3\. Install **package dependencies** for MOJO2 Java Runtime + +~~~bash +wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh -O $HOME/anaconda.sh +bash $HOME/anaconda.sh +source ~/.bashrc +conda create -y -n model-deployment python=3.6 +conda activate model-deployment +conda install -y -c conda-forge openjdk=8.0.192 +conda install -y -c conda-forge maven +~~~ + +4\. Set **temporary environment variable** for **Driverless AI License File** + +~~~bash +scp -i $DAI_MOJO_CDF_PEM $HOME/Downloads/license.sig ubuntu@$DAI_MOJO_CDF_INSTANCE:/home/ubuntu/ +export DRIVERLESS_AI_LICENSE_FILE="/home/ubuntu/license.sig" +~~~ + +### Prepare Hydraulic Test Data For Mojo Flink Scoring + +1\. Setup batch data in test-batch-data folder: + +~~~bash +cd /home/ubuntu/dai-model-deployment/mojo-pipeline/ +cp example.csv /home/ubuntu/dai-model-deployment/testData/test-batch-data/ +~~~ + +2\. Setup real-time data in test-real-time-data folder: + +~~~bash +cd /home/ubuntu/dai-model-deployment/testData/test-real-time-data/ +cp /home/ubuntu/dai-model-deployment/mojo-pipeline/example.csv . +echo -e "$(sed '1d' example.csv)\n" > example.csv +split -dl 1 --additional-suffix=.csv example.csv test_ +rm -rf example.csv +~~~ + +### Set Up Kafka Local Cluster in EC2 + +1\. Download **Kafka** + +~~~bash +cd $HOME +wget http://apache.mirrors.hoobly.com/kafka/2.4.1/kafka_2.12-2.4.1.tgz +tar xzf kafka_2.12-2.4.1.tgz +cd kafka_2.12-2.4.1 +~~~ + +2\. Start the **Kafka Environment** + +In the current terminal, start the Zookeeper service + +~~~bash +# Start the ZooKeeper service +bin/zookeeper-server-start.sh config/zookeeper.properties +~~~ + +Open a new terminal to EC2, start the Kafka broker service + +~~~bash +# Start the Kafka broker service +cd kafka_2.12-2.4.1 + +bin/kafka-server-start.sh config/server.properties +~~~ + +3\. Open a new terminal to EC2, create a **dai-mojo-predictions** Kafka topic to store your predictions + +~~~bash +cd kafka_2.12-2.4.1 + +bin/kafka-topics.sh --create \ + --zookeeper ${EC2_PUBLIC_DNS}:2181 \ + --replication-factor 1 --partitions 1 \ + --topic dai-mojo-predictions +~~~ + +### Set Up Flink Local Cluster in EC2 + +1\. Download **Flink** + +~~~bash +cd $HOME +wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz +tar xzf flink-1.10.0-bin-scala_2.12.tgz +~~~ + +2\. Add Driverless AI License File JVM System Property for all Flink processes to `flink-conf.yaml` + +~~~bash +echo "# Java options to start the JVM of all Flink processes with" | tee -a /home/ubuntu/flink-1.10.0/conf/flink-conf.yaml +echo "env.java.opts=\"-Dai.h2o.mojos.runtime.license.file=/home/ubuntu/license.sig\"" | tee -a /home/ubuntu/flink-1.10.0/conf/flink-conf.yaml +~~~ + +3\. Start the Local Flink Cluster + +~~~bash +cd $HOME/flink-1.10.0 +./bin/start-cluster.sh +# ./bin/stop-cluster.sh +~~~ + +4\. Access the Flink UI: http://${EC2_PUBLIC_DNS}:8081/#/overview + +### Compile Flink Kafka MOJO ML Data Pipeline Jobs + +1\. Open a new terminal to EC2, download **Driverless AI Deployment Examples** Repo and compile the Java code for **Flink MOJO ML Data Pipeline** jobs into a **JAR package** + +~~~bash +cd $HOME +git clone https://github.com/h2oai/dai-deployment-examples +cd $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline +mvn clean install +~~~ + +## Task 2: Deploy MOJO Scoring Pipeline to Flink to send to Kafka + +### Real-Time Scoring + +1\. Submit a Flink Kafka MOJO Stream Scoring Job for execution: + +~~~bash +$HOME/flink-1.10.0/bin/flink run -c org.apache.h2o.daimojo.flink.datapipeline.ClassifyHydCoolCond.RealTimePredHydCoolCondKafka $HOME/dai-deployment-examples/mojo-flink/daimojo-flink-data-pipeline/target/daimojo-flink-data-pipeline-1.10.0.jar --topic dai-mojo-predictions --bootstrap.servers ${EC2_PUBLIC_DNS}:9092 --zookeeper.connect ${EC2_PUBLIC_DNS}:2181 +~~~ + +2\. Once the Flink job finishes running, it will transition to the Completed Job List: + +![DAI MOJO Flink Kafka Stream Job Finished](./images/finished-mojo-flink-kafka-stream-job.jpg) + +3\. Check out the Data Pipeline diagram of the Flink Job **Deploy DAI Mojo SP within a Flink Kafka Streaming Data Pipeline**: + +![DAI MOJO Flink Kafka Stream Data Pipeline](./images/mojo-flink-kafka-stream-job-data-pipeline.jpg) + +4\. Open a new terminal to EC2, view the Real-Time Scores by reading the predictions from the Kafka topic: + +~~~bash +cd kafka_2.12-2.4.1 + +bin/kafka-console-consumer.sh --topic dai-mojo-predictions \ + --bootstrap-server ${EC2_PUBLIC_DNS}:9092 \ + --from-beginning +~~~ + +![Kafka Topic DAI MOJO Scores](./images/kafka-topic-mojo-scores.jpg) + +## Conclusion + +Congratulations, we just deployed a **Driverless AI MOJO Scoring Pipeline** within a **Flink Kafka Data Pipeline** to do **real-time scoring**. As a recap, we set up the environment in an AWS EC2 instance by setting up the Driverless AI MOJO Scoring Pipeline requirements, setting up Flink and Kafka on a single node and preparing some test data to be used for batch scoring or real-time scoring. With the environment setup, we used a custom Flink DaiMojoTransform RichMapFunction within a Flink Real-Time Data Pipeline to score our data and send those predictions to a **Kafka topic** using **FlinkKafkaProducer**. By reading the Kafka topic, we were able to see the results from real-time scoring. \ No newline at end of file diff --git a/mojo-flink/images/finished-mojo-flink-kafka-stream-job.jpg b/mojo-flink/images/finished-mojo-flink-kafka-stream-job.jpg new file mode 100644 index 0000000..48bbc77 Binary files /dev/null and b/mojo-flink/images/finished-mojo-flink-kafka-stream-job.jpg differ diff --git a/mojo-flink/images/kafka-topic-mojo-scores.jpg b/mojo-flink/images/kafka-topic-mojo-scores.jpg new file mode 100644 index 0000000..8120860 Binary files /dev/null and b/mojo-flink/images/kafka-topic-mojo-scores.jpg differ diff --git a/mojo-flink/images/mojo-flink-kafka-stream-job-data-pipeline.jpg b/mojo-flink/images/mojo-flink-kafka-stream-job-data-pipeline.jpg new file mode 100644 index 0000000..1a823e9 Binary files /dev/null and b/mojo-flink/images/mojo-flink-kafka-stream-job-data-pipeline.jpg differ