diff --git a/mojo-db-udf/.gitignore b/mojo-db-udf/.gitignore new file mode 100644 index 0000000..6eb580a --- /dev/null +++ b/mojo-db-udf/.gitignore @@ -0,0 +1,85 @@ +# Created by https://www.gitignore.io/api/java,gradle,intellij +# Edit at https://www.gitignore.io/?templates=java,gradle,intellij + +### Intellij ### +.idea/ +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# File-based project format +*.iws + +# IntelliJ +out/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +### Intellij Patch ### +# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 + +# *.iml +# modules.xml +# .idea/misc.xml +# *.ipr + +### Java ### +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Gradle ### +.gradle +build/ + +# Ignore Gradle GUI config +gradle-app.setting + +# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored) +!gradle-wrapper.jar + +# Cache of project +.gradletasknamecache + +# # Work around https://youtrack.jetbrains.com/issue/IDEA-116898 +# gradle/wrapper/gradle-wrapper.properties + +### Gradle Patch ### +**/build/ + +# End of https://www.gitignore.io/api/java,gradle,intellij +# +dist/ diff --git a/mojo-db-udf/LICENSE b/mojo-db-udf/LICENSE new file mode 100644 index 0000000..7762036 --- /dev/null +++ b/mojo-db-udf/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 H2O.ai + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/mojo-db-udf/build.gradle b/mojo-db-udf/build.gradle new file mode 100644 index 0000000..76250dd --- /dev/null +++ b/mojo-db-udf/build.gradle @@ -0,0 +1,47 @@ +plugins { + // Apply the java plugin to add support for Java + id 'java' + + // Apply the application plugin to add support for building an application + id 'application' +} + +description = "H2O DAI Mojo Database Scoring ${version}" + +repositories { + // Use jcenter for resolving your dependencies. + // You can declare any Maven/Ivy/file repository here. + jcenter() +} + +dependencies { + // A lightweight cmd line interface + implementation "info.picocli:picocli:4.0.1" + // Logging + implementation "org.apache.logging.log4j:log4j-api:${slf4jVersion}" + implementation "org.apache.logging.log4j:log4j-core:${slf4jVersion}" + implementation "org.apache.logging.log4j:log4j-slf4j-impl:${slf4jVersion}" + // MOJO2 Runtime + implementation "ai.h2o:mojo2-runtime-api:${mojoVersion}" + implementation "ai.h2o:mojo2-runtime-impl:${mojoVersion}" + // Config library + implementation "com.typesafe:config:${configVersion}" + + // Use JUnit test framework + testImplementation 'junit:junit:4.12' + testImplementation 'com.h2database:h2:1.4.199' +} + +application { + mainClassName = "ai.h2o.mojos.db.MojoDbScorer" +} + +installDist { + into file("${projectDir}/dist/${project.name}") +} + +clean { + delete file("${projectDir}/dist") +} + +build.dependsOn installDist \ No newline at end of file diff --git a/mojo-db-udf/config/lending_club.properties b/mojo-db-udf/config/lending_club.properties new file mode 100644 index 0000000..2976a68 --- /dev/null +++ b/mojo-db-udf/config/lending_club.properties @@ -0,0 +1,28 @@ +mojo-db-scoring-app { + model { + // Location of model pipeline in MOJO format + file = "pipeline.mojo" + } + db { + // Database connection string + connection = "jdbc:postgresql://192.168.1.171:5432/LendingClub" + // Database user + user = "postgres" + // User password + password = "aDJvaDJvCg==" + // Password prompt in case the password is not specified + prompt = "" + } + + sql { + key = "id" + prediction = "" + select = "select id, loan_amnt, term, int_rate, installment, emp_length, home_ownership, annual_inc, verification_status, addr_state, dti, delinq_2yrs, inq_last_6mths, pub_rec, revol_bal, revol_util, total_acc from 'import'.loanstats4" + write= "update 'import'.loanstats4 set @RESULT@ where @KEY@ = @ROWID@" + savePrediction = 0 + // Field separator + separator = "," + + } +} + diff --git a/mojo-db-udf/gradle.properties b/mojo-db-udf/gradle.properties new file mode 100644 index 0000000..12693af --- /dev/null +++ b/mojo-db-udf/gradle.properties @@ -0,0 +1,6 @@ +version = 0.1.0 + +# Version of dependencies +mojoVersion = 2.1.5 +slf4jVersion = 2.7 +configVersion = 1.3.4 diff --git a/mojo-db-udf/gradle/wrapper/gradle-wrapper.jar b/mojo-db-udf/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..5c2d1cf Binary files /dev/null and b/mojo-db-udf/gradle/wrapper/gradle-wrapper.jar differ diff --git a/mojo-db-udf/gradle/wrapper/gradle-wrapper.properties b/mojo-db-udf/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..ca9d628 --- /dev/null +++ b/mojo-db-udf/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/mojo-db-udf/gradlew b/mojo-db-udf/gradlew new file mode 100755 index 0000000..b0d6d0a --- /dev/null +++ b/mojo-db-udf/gradlew @@ -0,0 +1,188 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/mojo-db-udf/gradlew.bat b/mojo-db-udf/gradlew.bat new file mode 100644 index 0000000..15e1ee3 --- /dev/null +++ b/mojo-db-udf/gradlew.bat @@ -0,0 +1,100 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/mojo-db-udf/settings.gradle b/mojo-db-udf/settings.gradle new file mode 100644 index 0000000..2dbd508 --- /dev/null +++ b/mojo-db-udf/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user manual at https://docs.gradle.org/5.3.1/userguide/multi_project_builds.html + */ + +rootProject.name = 'mojo-db-udf' diff --git a/mojo-db-udf/src/main/java/ai/h2o/mojos/db/MojoDbScorer.java b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/MojoDbScorer.java new file mode 100644 index 0000000..bd60b50 --- /dev/null +++ b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/MojoDbScorer.java @@ -0,0 +1,228 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package ai.h2o.mojos.db; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LoggerContext; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Scanner; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.stream.IntStream; + +import ai.h2o.mojos.runtime.MojoPipeline; +import picocli.CommandLine; + +import static ai.h2o.mojos.db.Utils.createConnection; +import static ai.h2o.mojos.db.Utils.f; +import static picocli.CommandLine.Command; +import static picocli.CommandLine.Option; + +@Command(name = "dbscorer") +public class MojoDbScorer extends Args implements Callable { + + static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MojoDbScorer.class); + + public static void main(String[] args) { + int exitCode = new CommandLine(new MojoDbScorer()).execute(args); + System.exit(exitCode); + } + + @Override + public Integer call() throws Exception { + if (verbose) { + LoggerContext ctx = (LoggerContext) LoggerFactory.getILoggerFactory(); + ctx.getRootLogger().setLevel(Level.DEBUG); + } + // Switch on verbose if necessary + SQLCommandConfig cfg = new SQLCommandConfig.Builder().loadFrom(configFile).build(); + + if (cfg.dbPasswordPrompt) { + System.out.print(f("Please enter JDBC password for user %s:", cfg.dbUser)); + BufferedReader reader = + new BufferedReader(new InputStreamReader(System.in)); + cfg.dbPassword = reader.readLine(); + } + + if (!cfg.modelFile.exists()) { + System.out.println("Cannot locate model zip file " + cfg.modelFile); + return -1; + } + MojoPipeline model = MojoPipeline.loadFrom(cfg.modelFile.getAbsolutePath()); + LOGGER.debug("Using model file: file={}", cfg.modelFile); + + if (capacity <= 0) { + capacity = (int) Math.round((numWorkers + (numWorkers * 0.75))); + } + LOGGER.debug("TQ: capacity={}, numWorkers={}", capacity, numWorkers); + + if (inspect) { + dumpInspect(cfg.modelFile, model); + return 0; + } + + // Create a set of worker threads + BlockingQueue queue = new ArrayBlockingQueue(capacity); + Thread[] workers = new Thread[numWorkers]; + for (int i = 0; i < workers.length; i++) { + (workers[i] = new Thread(new Worker(queue, model, cfg, this), "DB-Scorer-Worker-"+i)).start(); + } + + try (Connection connection = createConnection(cfg, LOGGER)) { + try (Statement statement = connection.createStatement()) { + + ResultSet resultSet = statement.executeQuery(cfg.sqlSelectStatement); + ResultSetMetaData rsmd = resultSet.getMetaData(); + int columnsNumber = rsmd.getColumnCount(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Reading ResultSet: sqlSelect={}, colNames", + cfg.sqlSelectStatement, + IntStream.range(0, columnsNumber) + .mapToObj(i -> getColName(rsmd, i)).toArray()); + } + + // for CSV output write header + if (cfg.sqlWriteFormat == SQLCommandConfig.ExportFormat.CSV) { + System.out.print(cfg.sqlKey + ","); + for (int i = 0; i < model.getOutputMeta().size(); i++) { + System.out.print(model.getOutputMeta().getColumnName(i)); + if (model.getOutputMeta().size() > i + 1) { + System.out.print(","); + } + } + System.out.println(); + } + + String resultSetRow = ""; + int rowsSelected = 0; + + if (wait) { + System.out.println("Model Ready... press Enter to start "); + Scanner scanner = new Scanner(System.in); + scanner.nextLine(); + System.out.println("Running...."); + } + while (resultSet.next()) { + for (int i = 1; i <= columnsNumber; i++) { + if (i > 1) { + resultSetRow = + resultSetRow + "" + cfg.sqlFieldSeparator + resultSet.getString(i); + } else { + resultSetRow = resultSet.getString(i); + } + } + queue.put(resultSetRow); + rowsSelected++; + } + + if (stats) { + LOGGER.info("Total selected rows: {}", rowsSelected); + } + } + } + + // Add special end-of-stream markers to terminate the workers + for (int i = 0; i < workers.length; i++) { + queue.put(Worker.POISON_PILL); + } + // Wait for all to finish + for (int i = 0; i < workers.length; i++) { + workers[i].join(); + } + + return 0; + } + + void dumpInspect(File modelFile, MojoPipeline model) { + System.out.println("Details of Model: " + modelFile.getAbsolutePath()); + System.out.println("UUID: " + model.getUuid()); + System.out.println("Input Features"); + String select = ""; + for (int i = 0; i < model.getInputMeta().size(); i++) { + System.out.println( + i + " = Name: " + model.getInputMeta().getColumnName(i) + " Type: " + model + .getInputMeta().getColumnType(i)); + if (select.length() > 1) { + select = select + ", " + model.getInputMeta().getColumnName(i); + } else { + select = model.getInputMeta().getColumnName(i); + } + } + System.out.println("Output Features"); + for (int i = 0; i < model.getOutputMeta().size(); i++) { + System.out.println( + i + " = Name: " + model.getOutputMeta().getColumnName(i) + " Type: " + model + .getOutputMeta().getColumnType(i)); + } + + System.out.println("Suggested configuration for properties file:"); + System.out.println("\nselect , " + select + " from "); + System.out.println("\nupdate set where ="); + + System.out.println( + "\nChange the values in <> above and manually test before using them in the program."); + + long memfree = Runtime.getRuntime().freeMemory(); + long + memorySize = + ((com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean()) + .getTotalPhysicalMemorySize(); + System.out.println("\nThe System has " + Math.round(memorySize / 1073741824) + + "GB available physically. This program is using " + Math + .round(memfree / 1073741824) + + "GB Consider adjusting -Xms and -Xmx to no more than " + Math + .round((memorySize / 1073741824) * 0.75) + "GB"); + System.out + .println("The System has " + Runtime.getRuntime().availableProcessors() + " Processors."); + } + + private String getColName(ResultSetMetaData rsmd, int index) { + try { + return rsmd.getColumnName(index); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} + +class Args { + + @Option(names = {"-C", "--config"}, description = "Configuration file.", required = true) + File configFile; + + @Option(names = {"-v", "--verbose"}, description = "Verbose output.") + boolean verbose = false; + + @Option(names = {"-w", "--wait"}, description = "Wait.") + boolean wait = false; + + @Option(names = {"-s", "--stats"}, description = "Print statistics.") + boolean stats = false; + + @Option(names = {"-i", "--inspect"}, description = "Inspect.") + boolean inspect = false; + + @Option(names = {"-c", "--capacity"}, description = "Capacity.") + int capacity = -1; + + @Option(names = {"-n", "--num_workers"}, description = "Number of workers.") + int numWorkers = Runtime.getRuntime().availableProcessors(); + + @Option(names = {"-e", "--data_errors"}, description = "Print errors in data.") + boolean logDataErrors = false; + +} diff --git a/mojo-db-udf/src/main/java/ai/h2o/mojos/db/ReadPropertyValue.java b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/ReadPropertyValue.java new file mode 100644 index 0000000..b615cdb --- /dev/null +++ b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/ReadPropertyValue.java @@ -0,0 +1,68 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package daimojorunner_db; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Date; +import java.util.Properties; + +/** + * + * @author ericgudgion + */ +class ReadPropertyValue { + String result = ""; + InputStream inputStream; + +public String getPropValue(String name, String defaultSetting) throws IOException { + + Boolean verbose = Boolean.parseBoolean(System.getProperty("verbose", "false")); + + try { + Properties prop = new Properties(); + String propFileName = System.getProperty("propertiesfilename", "DAIMojoRunner_DB.properties"); + if (verbose) { + System.out.println("Using properties file: "+propFileName); + } + + try { + inputStream = new FileInputStream(propFileName); + } catch (Exception e){ + inputStream = getClass().getClassLoader().getResourceAsStream(propFileName); + } + + if (inputStream != null) { + prop.load(inputStream); + } else { + throw new FileNotFoundException("property file '" + propFileName + "' not found in the classpath"); + } + + try { + result = prop.getProperty(name); + int a =result.length(); + } catch (Exception ex) { + if (verbose) { + System.out.println("Propertry "+name+" not found in properties file, setting to default"); + } + result = prop.getProperty(name,defaultSetting); + } + if (verbose) { + System.out.println("Property "+name+" = "+result ); + } + } catch (Exception e) { + System.out.println("Exception: " + e); + } finally { + inputStream.close(); + } + return result; +} + + +} diff --git a/mojo-db-udf/src/main/java/ai/h2o/mojos/db/SQLCommandConfig.java b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/SQLCommandConfig.java new file mode 100644 index 0000000..2a6f11a --- /dev/null +++ b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/SQLCommandConfig.java @@ -0,0 +1,93 @@ +package ai.h2o.mojos.db; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.io.File; +import java.util.stream.Stream; + +import static ai.h2o.mojos.db.Utils.decodeBase64; + +public class SQLCommandConfig { + + enum ExportFormat { + CSV, + UPDATE, + INSERT; + + public static ExportFormat from(String statement) { + String lstm = statement.toLowerCase().trim(); + return lstm.toLowerCase().startsWith("update") + ? UPDATE + : ( + statement.toLowerCase().startsWith("insert") + ? INSERT + : CSV + ); + } + + } + SQLCommandConfig(Config conf) { + modelFile = new File(conf.getString("model.file")); + dbConnectionString = conf.getString("db.connection"); + dbUser = conf.getString("db.user"); + dbPassword = decodeBase64(conf.getString("db.password")); + dbPasswordPrompt = conf.getBoolean("db.prompt"); + sqlKey = conf.getString("sql.key"); + sqlPredictionCol = conf.getString("sql.predictionCol"); + sqlSelectStatement = conf.getString("sql.select"); + sqlWriteStatement = conf.getString("sql.write"); + sqlWriteFormat = ExportFormat.from(sqlWriteStatement); + sqlSavePrediction = conf.getInt("sql.savePrediction"); + sqlFieldSeparator = conf.getString("sql.separator"); + } + + final File modelFile; + + final String dbConnectionString; + final String dbUser; + String dbPassword; + final Boolean dbPasswordPrompt; + + final String sqlSelectStatement; + final String sqlWriteStatement; + final ExportFormat sqlWriteFormat; + final String sqlKey; + final String sqlPredictionCol; + int sqlSavePrediction; + final String sqlFieldSeparator; + public boolean hasCredentials() { + return !Stream.of(dbUser, dbPassword).allMatch(Utils::isEmpty); + } + + public static class Builder { + + SQLCommandConfig build() { + Config conf = ConfigFactory.load().getConfig("mojo-db-scoring-app"); + return new SQLCommandConfig(conf); + } + Builder loadFrom(File f) { + System.setProperty("config.file", f.getAbsolutePath()); + return this; + } + + } + + @Override + public String toString() { + return "SQLCommandConfig{" + + "modelFile='" + modelFile + '\'' + + ", sqlConnectionString='" + dbConnectionString + '\'' + + ", sqlUser='" + dbUser + '\'' + + ", sqlPassword='" + dbPassword + '\'' + + ", sqlPrompt='" + dbPasswordPrompt + '\'' + + ", sqlSelectStatement='" + sqlSelectStatement + '\'' + + ", sqlWriteStatement='" + sqlWriteStatement + '\'' + + ", sqlWriteFormat=" + sqlWriteFormat + + ", sqlKey='" + sqlKey + '\'' + + ", sqlPredictionCol='" + sqlPredictionCol + '\'' + + ", sqlSavePrediction=" + sqlSavePrediction + + ", sqlFieldSeparator='" + sqlFieldSeparator + '\'' + + '}'; + } +} diff --git a/mojo-db-udf/src/main/java/ai/h2o/mojos/db/StopWatch.java b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/StopWatch.java new file mode 100644 index 0000000..3508245 --- /dev/null +++ b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/StopWatch.java @@ -0,0 +1,81 @@ +package ai.h2o.mojos.db; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static ai.h2o.mojos.db.Utils.f; + +public class StopWatch { + final private List laps = new LinkedList<>(); + private Lap activeLap; + + class Lap implements AutoCloseable { + private final String name; + private long startTime; + private long endTime; + + private Lap(String name) { + this.name = name; + } + + Lap start() { + startTime = System.nanoTime(); + return this; + } + + Lap stop() { + endTime = System.nanoTime(); + return this; + } + + @Override + public void close() { + stopLap(this); + } + + @Override + public String toString() { + return f("time(%s)=%s", name, StopWatch.toString(endTime-startTime)); + } + } + + Lap startLap(String name) { + return (activeLap = new Lap(name).start()); + } + + public void stopLap() { + if (activeLap != null) { + stopLap(activeLap); + activeLap = null; + } + } + + private void stopLap(Lap lap) { + lap.stop(); + laps.add(lap); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Lap l : laps) { + sb.append(l.toString()).append(','); + } + return sb.toString(); + } + + public static String toString(long nanosec) { + final long hr = TimeUnit.NANOSECONDS.toHours (nanosec);nanosec -= TimeUnit.HOURS.toNanos(hr); + final long min = TimeUnit.NANOSECONDS.toMinutes(nanosec); nanosec -= TimeUnit.MINUTES.toNanos(min); + final long sec = TimeUnit.NANOSECONDS.toSeconds(nanosec); nanosec -= TimeUnit.SECONDS.toNanos(sec); + final long ms = TimeUnit.NANOSECONDS.toMillis(nanosec); nanosec -= TimeUnit.MILLISECONDS.toNanos(ms); + final long us = TimeUnit.NANOSECONDS.toMicros(nanosec); nanosec -= TimeUnit.MICROSECONDS.toNanos(us); + if( hr != 0 ) return String.format("%2d:%02d:%02d.%03d", hr, min, sec, ms); + if( min != 0 ) return String.format("%2d min %2d.%03d sec", min, sec, ms); + if( sec != 0 ) return String.format("%2d.%03d sec", sec, ms); + if( ms != 0 ) return String.format("%03d.%03d msec", ms, us); + if( us != 0 ) return String.format("%03d.%03d usec", us, nanosec); + return String.format("%3d nanosec", nanosec); + } +} diff --git a/mojo-db-udf/src/main/java/ai/h2o/mojos/db/Utils.java b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/Utils.java new file mode 100644 index 0000000..6428cb1 --- /dev/null +++ b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/Utils.java @@ -0,0 +1,69 @@ +package ai.h2o.mojos.db; + +import org.slf4j.Logger; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Base64; +import java.util.concurrent.TimeUnit; + +public class Utils { + + public static final boolean isEmpty(String s) { + return s == null || s.isEmpty(); + } + + public static String f(String s, Object ...args) { + return String.format(s, args); + } + + public static boolean isWindows() { + return System.getProperty("os.name").toLowerCase().startsWith("windows"); + } + + public static String decodeBase64(String s) { + return isEmpty(s) ? s : new String(Base64.getDecoder().decode(s)); + } + + public static Connection createConnection(SQLCommandConfig cmdConfig, Logger logger) throws SQLException { + Connection connection = null; + if (cmdConfig.sqlWriteFormat != SQLCommandConfig.ExportFormat.CSV) { + if (!cmdConfig.hasCredentials()) { + logger.debug("Connection string without seperate SQLUser and SQLPassword details."); + connection = DriverManager.getConnection(cmdConfig.dbConnectionString); + } else { + if (isWindows()) { + logger.debug("Connection string using Windows connection string."); + connection = + DriverManager.getConnection( + f("%s;user=%s;password=%s", + cmdConfig.dbConnectionString, + cmdConfig.dbUser, + cmdConfig.dbPassword + ) + ); + } else { + logger.debug("Connection string used separate SQL parameters."); + connection = + DriverManager.getConnection( + cmdConfig.dbConnectionString, + cmdConfig.dbUser, + cmdConfig.dbPassword + ); + } + } + logger.debug("Connected to database."); + } + return connection; + } + + public static String join(String[] ss, String sep) { + StringBuilder sb = new StringBuilder(); + for (String item : ss) { + if (sb.length() > 0) sb.append(sep); + sb.append(item); + } + return sb.toString(); + } +} diff --git a/mojo-db-udf/src/main/java/ai/h2o/mojos/db/Worker.java b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/Worker.java new file mode 100644 index 0000000..427dabb --- /dev/null +++ b/mojo-db-udf/src/main/java/ai/h2o/mojos/db/Worker.java @@ -0,0 +1,263 @@ +package ai.h2o.mojos.db; + +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.concurrent.BlockingQueue; +import java.util.stream.IntStream; + +import ai.h2o.mojos.runtime.MojoPipeline; +import ai.h2o.mojos.runtime.frame.MojoFrame; +import ai.h2o.mojos.runtime.frame.MojoFrameBuilder; +import ai.h2o.mojos.runtime.frame.MojoRowBuilder; + +import static ai.h2o.mojos.db.Utils.createConnection; +import static ai.h2o.mojos.db.Utils.isEmpty; + +public class Worker implements Runnable { + + static final String POISON_PILL = "__No_More_Work__"; + + static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Worker.class); + + private final MojoPipeline model; + private final BlockingQueue q; + private final SQLCommandConfig cmdConfig; + private final Args args; + + private int numRowsScored = 0; + private int numRowsRead = 0; + private int numRowError = 0; + + Worker(BlockingQueue q, MojoPipeline model, SQLCommandConfig cmdConfig, Args args) { + this.q = q; + this.model = model; + this.cmdConfig = cmdConfig; + this.args = args; + } + + public void run() { + MojoFrameBuilder frameBuilder = model.getInputFrameBuilder(); + MojoRowBuilder rowBuilder = frameBuilder.getMojoRowBuilder(); + MojoFrame iframe, oframe; + + String[] features = model.getInputMeta().getColumnNames(); + + StopWatch timer = new StopWatch(); + try (Connection connection = createConnection(cmdConfig, LOGGER)) { + try (Statement statement = createStatement(connection)) { + while (true) { + String row; + // Take the row to score from the queue + try(StopWatch.Lap lapTakeRow = timer.startLap("Get row from queue")) { + row = q.take(); + numRowsRead++; + } catch (InterruptedException ex) { + // Somebody signalled interupt, finish the work + break; + } + LOGGER.debug("Processing row: row={}", row); + LOGGER.debug("Target update column: sqlPredictionCol={}", cmdConfig.sqlPredictionCol); + + // Terminate if the end-of-stream marker was retrieved + if (row.equals(POISON_PILL)) { + numRowsRead--; + if (args.stats) { + LOGGER.info("Rows statistics: read={}, scored={}, errors={}, queue length={}", + numRowsRead, numRowsScored, numRowError, q.size()); + } + break; + } + + // Fill row builder + String rowId; + try(StopWatch.Lap lap = timer.startLap("Row parsing")) { + String[] fileline = row.split(cmdConfig.sqlFieldSeparator); + rowId = fileline[0]; + + if (!parseRow(row, fileline, features, rowBuilder)) + continue; + } + + // Make a prediction + try(StopWatch.Lap lap = timer.startLap("MOJO prediction")) { + frameBuilder = model.getInputFrameBuilder(); + frameBuilder.addRow(rowBuilder); + iframe = frameBuilder.toMojoFrame(); + oframe = model.transform(iframe); + } + + String[] prediction = null; + // Single prediction + String result = row + " "; + // Multi prediction + String resultM = ""; + + try (StopWatch.Lap lap = timer.startLap("Preparing output statement")) { + if (!isEmpty(cmdConfig.sqlPredictionCol)) { + prediction = oframe.getColumn(cmdConfig.sqlSavePrediction).getDataAsStrings(); + result = prediction[0]; + } else { + for (int r = 0; r < oframe.getNcols(); r++) { + if (cmdConfig.sqlWriteFormat != SQLCommandConfig.ExportFormat.CSV) { + // if not writing CSV do not format the data + resultM = resultM + "" + normalizeColumnName(oframe.getColumnName(r)) + "="; + } + prediction = oframe.getColumn(r).getDataAsStrings(); + for (int a = 0; a < prediction.length; a++) { + if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.CSV) { + resultM = resultM + prediction[a]; + } else { + resultM = resultM + "'" + prediction[a] + "'"; + } + if (r < oframe.getNcols() - 1) { + resultM = resultM + ","; + } + } + } + } + } + + LOGGER.debug("Update single field: field={}, result={}", cmdConfig.sqlSavePrediction, result); + LOGGER.debug("Update multiple fields: result={}", resultM); + + String sqlUpdateStm = cmdConfig.sqlWriteStatement + .replaceAll("@KEY@", cmdConfig.sqlKey) + .replaceAll("@ROWID@","'" + rowId + "'"); + + try (StopWatch.Lap lap = timer.startLap("Preparing output statement #2")) { + + if (isEmpty(cmdConfig.sqlPredictionCol)) { + if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.UPDATE) { + sqlUpdateStm = sqlUpdateStm + .replaceFirst("@RESULT@", resultM); + } else if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.INSERT) { + resultM = resultM.replaceAll("\\\"|'|", ""); + String[] resultdata = resultM.split("=| |,"); + String colnames = ""; + String colvalues = ""; + if (cmdConfig.sqlKey.length() != 0) { + colnames = "\"" + cmdConfig.sqlKey + "\""; + colvalues = "'" + rowId + "'"; + } + for (int i = 0; i < resultdata.length; i = i + 2) { + if (colnames.length() > 1) { + colnames = colnames + ",\"" + resultdata[i] + "\""; + } else { + colnames = "\"" + resultdata[i] + "\""; + } + if (colvalues.length() > 0) { + colvalues = colvalues + ",'" + resultdata[i + 1] + "'"; + } else { + colvalues = "'" + resultdata[i + 1] + "'"; + } + } + sqlUpdateStm = sqlUpdateStm + .replaceFirst("@COLNAMES@", colnames) + .replaceFirst("@COLVALUES@", colvalues); + } else if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.CSV) { + sqlUpdateStm = rowId + "," + resultM; + } + } else { + if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.UPDATE) { + sqlUpdateStm = + sqlUpdateStm + .replaceFirst( "@PREDICTION_COL@", cmdConfig.sqlPredictionCol) + .replaceFirst("@PREDICTION@", "'" + result + "'"); + } else if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.INSERT) { + sqlUpdateStm = + sqlUpdateStm + .replaceFirst("@PREDICTION@", result); + } + if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.CSV) { + // save just the value for the CSV updates + sqlUpdateStm = rowId + "," + result; + } + LOGGER.debug("Prepared result: row: {}, result {}", row, result); + } + } + + LOGGER.debug("Updating record: {}", sqlUpdateStm); + + try(StopWatch.Lap lap = timer.startLap("Updating DB with prediction")) { + if (cmdConfig.sqlWriteFormat == SQLCommandConfig.ExportFormat.CSV) { + System.out.println(sqlUpdateStm); + } else { + try { + statement.executeUpdate(sqlUpdateStm); + } catch (SQLException ex) { + LOGGER.error("Cannot execute update statement", ex); + } + } + } + + LOGGER.debug("Timing: {}", timer.toString()); + numRowsScored++; + } + } + } catch (SQLException e) { + LOGGER.error("Cannot create DB connection!", e); + } + } + + private Statement createStatement(Connection conn) throws SQLException { + return conn != null ? conn.createStatement() : null; + } + + private boolean parseRow(String row, + String[] fileline, + String[] features, + MojoRowBuilder rowBuilder) { + assert rowBuilder.size() == features.length : "RowBuilder does not match number of input features!"; + + if (LOGGER.isDebugEnabled()) { // protect expensive debug statement + LOGGER.debug("Row split: {}", + Arrays.toString( + IntStream.range(0, fileline.length).mapToObj(i -> i + "=" + fileline[i]).toArray() + )); + } + + if (fileline.length != features.length + 1) { + numRowError++; + LOGGER.debug("Bad row: wrong number of features! row={}, len={}, expected features={}", + row, fileline.length, features.length); + return false; + } + int fieldErrors = 0; + for (int f = 0; f < rowBuilder.size(); f++) { + if (fileline.length <= (f + 1)) { + // parsing can be very strange.... for some records + rowBuilder.setValue(features[f], ""); + } else { + if (fileline[1 + f].toLowerCase().equals("null")) { + rowBuilder.setValue(features[f], ""); + } else { + try { + rowBuilder.setValue(features[f], fileline[1 + f]); + } catch (Exception ex) { + numRowError++; + fieldErrors++; + LOGGER.debug("Bad row field! feature={}, value={}", f, fileline[1+f]); + continue; + } + } + + } + } + return fieldErrors == 0; + } + + static String normalizeColumnName(String name) { + return name.replace('.', '_').replace('-', '_'); + } +} + + + + + + + \ No newline at end of file diff --git a/mojo-db-udf/src/main/resources/log4j2.xml b/mojo-db-udf/src/main/resources/log4j2.xml new file mode 100644 index 0000000..d4509f8 --- /dev/null +++ b/mojo-db-udf/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/mojo-db-udf/src/main/resources/reference.conf b/mojo-db-udf/src/main/resources/reference.conf new file mode 100644 index 0000000..6298074 --- /dev/null +++ b/mojo-db-udf/src/main/resources/reference.conf @@ -0,0 +1,41 @@ +mojo-db-scoring-app { + model { + // Location of model pipeline in MOJO format + file = "pipeline.mojo" + } + db { + // Database connection string + connection = "" + // Database user + user = "postgres" + // User password in BASE64 encoding + password = "" + // Ask for password true/false + prompt = false + } + + sql { + // ID column + key = "id" + // Name of column to store prediction if `write` contains update or insert statement + predictionCol = "" + // Select statement of rows to score, the first selected column needs to be named as key above + // Note: the key column can be referenced by ${mojo-db-scoring-app.sql.key} + select = "" + // Table update/insert statement or CSV + // @KEY@ - replaced by key column name + // @ROWID@ - replaced by value of key column + // @COLNAMES@ - replaced by list of column names + // @COLVALUES@ - replaced by list of values + // @PREDICTION_COL@ - replaced by name of column to hold prediction, ${mojo-db-scoring-app.sql.predictionCol} + // @PREDICTION@ - replaced by prediction + // @RESULT@ - replaced by update of multiple fields representing prediction (e.g., bad_loan_0='0.2', bad_loan_1='0.8') + write= "" + // Index of column which contains prediction + savePrediction = 0 + // Internal field separator + separator = "," + + } +} + diff --git a/mojo-db-udf/src/test/java/ai/h2o/mojos/db/SQLCommandConfigTest.java b/mojo-db-udf/src/test/java/ai/h2o/mojos/db/SQLCommandConfigTest.java new file mode 100644 index 0000000..1ccad73 --- /dev/null +++ b/mojo-db-udf/src/test/java/ai/h2o/mojos/db/SQLCommandConfigTest.java @@ -0,0 +1,15 @@ +package ai.h2o.mojos.db; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SQLCommandConfigTest { + + @Test + public void testLoad() { + SQLCommandConfig cfg = new SQLCommandConfig.Builder().build(); + assertEquals("SQLCommandConfig{modelFile='pipeline.mojo', sqlConnectionString='', sqlUser='postgres', sqlPassword='', sqlPrompt='false', sqlSelectStatement='', sqlWriteStatement='', sqlWriteFormat=CSV, sqlKey='id', sqlPredictionCol='', sqlSavePrediction=0, sqlFieldSeparator=','}", + cfg.toString()); + } +} \ No newline at end of file diff --git a/mojo-db-udf/src/test/java/ai/h2o/mojos/db/WorkerTest.java b/mojo-db-udf/src/test/java/ai/h2o/mojos/db/WorkerTest.java new file mode 100644 index 0000000..acc125c --- /dev/null +++ b/mojo-db-udf/src/test/java/ai/h2o/mojos/db/WorkerTest.java @@ -0,0 +1,127 @@ +package ai.h2o.mojos.db; + +import com.typesafe.config.ConfigFactory; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import ai.h2o.mojos.runtime.MojoPipeline; +import ai.h2o.mojos.runtime.readers.MojoPipelineReaderBackendFactory; + +import static org.junit.Assert.*; +import static ai.h2o.mojos.db.Utils.f; +import static ai.h2o.mojos.db.Utils.join; + +public class WorkerTest { + private static SQLCommandConfig cfgSingle; + private static SQLCommandConfig cfgMulti; + private static MojoPipeline model; + private BlockingQueue queue; + private Args args; + + static final String[] IRIS_DATA = new String[] { + "6.7,2.5,5.8,1.8", + "7.2,3.6,6.1,2.5", + "6.5,3.2,5.1,2.0", + "6.4,2.7,5.3,1.9", + "6.8,3.0,5.5,2.1", + }; + + @BeforeClass + public static void beforeAll() throws Exception { + cfgSingle = new SQLCommandConfig(ConfigFactory.load("worker_test_update_single").getConfig("mojo-db-scoring-app")); + cfgMulti = new SQLCommandConfig(ConfigFactory.load("worker_test_update_multi").getConfig("mojo-db-scoring-app")); + model = MojoPipeline.loadFrom( + MojoPipelineReaderBackendFactory.createReaderBackend( + ClassLoader.getSystemResourceAsStream("pipeline.mojo"))); + fillH2(); + } + + @Before + public void before() throws Exception { + queue = new ArrayBlockingQueue<>(IRIS_DATA.length+1); + for (int i = 0; i < IRIS_DATA.length; i++) { + queue.put(f("%d,%s", i, IRIS_DATA[i])); + } + queue.put(Worker.POISON_PILL); + args = new Args(); + args.stats = true; + } + + @Test + public void testUpdateSingle() throws Exception { + Worker w = new Worker(queue, model, cfgSingle, args); + w.run(); + + List actPred = getPredictions(cfgSingle, new String[] {"prediction"}, "iris_table_single"); + assertTrue(actPred.stream().allMatch(p -> p[0] > 0)); + } + + @Test + public void testUpdateMulti() throws Exception { + Worker w = new Worker(queue, model, cfgMulti, args); + w.run(); + + List actPred = getPredictions(cfgMulti, + new String[] {"species_Iris_setosa", "species_Iris_versicolor", "species_Iris_virginica"}, + "iris_table_multi"); + assertTrue(actPred.stream().allMatch(p -> p[0] > 0 && p[1] > 0 && p[2] > 0)); + } + + static List getPredictions(SQLCommandConfig cfg, String[] selectCol, String table) throws SQLException { + try(Connection c = DriverManager.getConnection(cfg.dbConnectionString, cfg.dbUser, cfg.dbPassword)) { + try (Statement st = c.createStatement()) { + ResultSet rs = st.executeQuery(f("select %s from %s", join(selectCol, ","), table)); + List result = new LinkedList<>(); + while (rs.next()) { + double[] res = new double[selectCol.length]; + for (int i = 0; i < selectCol.length; i++) res[i] = rs.getDouble(selectCol[i]); + result.add(res); + } + return result; + } + } + } + + static void fillH2() throws SQLException { + // Fill H2 + try(Connection c = DriverManager.getConnection(cfgSingle.dbConnectionString, cfgSingle.dbUser, cfgSingle.dbPassword)) { + try (Statement st = c.createStatement()) { + // Simple input data + st.execute("drop table if exists iris_table_single"); + st.execute("drop table if exists iris_table_multi"); + + st.execute("create table iris_table_single " + + "(id int primary key, sepal_len double, sepal_wid double, petal_len double, petal_wid double, " + + "prediction double)"); + st.execute("create table iris_table_multi " + + "(id int primary key, sepal_len double, sepal_wid double, petal_len double, petal_wid double, " + + "species_Iris_setosa double, species_Iris_versicolor double, species_Iris_virginica double)"); + + for (String t : new String[] {"iris_table_single", "iris_table_multi"}) { + for (int i = 0; i < IRIS_DATA.length; i++) { + st.execute( + f( + "insert into %s (id,sepal_len,sepal_wid,petal_len,petal_wid) values(%d,%s)", + t, + i, + IRIS_DATA[i] + ) + ); + } + } + } + } + } +} \ No newline at end of file diff --git a/mojo-db-udf/src/test/resources/log4j2.xml b/mojo-db-udf/src/test/resources/log4j2.xml new file mode 100644 index 0000000..fb771e1 --- /dev/null +++ b/mojo-db-udf/src/test/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/mojo-db-udf/src/test/resources/pipeline.mojo b/mojo-db-udf/src/test/resources/pipeline.mojo new file mode 100644 index 0000000..8ae2db5 Binary files /dev/null and b/mojo-db-udf/src/test/resources/pipeline.mojo differ diff --git a/mojo-db-udf/src/test/resources/worker_test_update_multi.conf b/mojo-db-udf/src/test/resources/worker_test_update_multi.conf new file mode 100644 index 0000000..f42ef38 --- /dev/null +++ b/mojo-db-udf/src/test/resources/worker_test_update_multi.conf @@ -0,0 +1,19 @@ +mojo-db-scoring-app { + model { + // Location of model pipeline in MOJO format + file = "pipeline.mojo" + } + db { + // Database connection string + connection = "jdbc:h2:mem:myDb;DB_CLOSE_DELAY=-1" + // Database user + user = "sa" + // User password + password = "sa" + } + + sql { + key = "id" + write= "update iris_table_multi set @RESULT@ where @KEY@ = @ROWID@" + } +} diff --git a/mojo-db-udf/src/test/resources/worker_test_update_single.conf b/mojo-db-udf/src/test/resources/worker_test_update_single.conf new file mode 100644 index 0000000..982f652 --- /dev/null +++ b/mojo-db-udf/src/test/resources/worker_test_update_single.conf @@ -0,0 +1,20 @@ +mojo-db-scoring-app { + model { + // Location of model pipeline in MOJO format + file = "pipeline.mojo" + } + db { + // Database connection string + connection = "jdbc:h2:mem:myDb;DB_CLOSE_DELAY=-1" + // Database user + user = "sa" + // User password + password = "sa" + } + + sql { + key = "id" + predictionCol = "prediction" + write= "update iris_table_single set "${mojo-db-scoring-app.sql.predictionCol}"=@PREDICTION@ where @KEY@ = @ROWID@" + } +}