Skip to content

Commit

Permalink
Merge pull request #359 from tigergraph/DOC-1885-DataLakeLoadingTo3.8
Browse files Browse the repository at this point in the history
AddingTo3.8_DataLakeLoading
  • Loading branch information
victorleeTG authored Nov 3, 2023
2 parents 65881a5 + 1e3b998 commit 726de7c
Showing 1 changed file with 290 additions and 3 deletions.
293 changes: 290 additions & 3 deletions modules/data-loading/pages/spark-connection-via-jdbc-driver.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Using the open-source type 4 JDBC Driver for TigerGraph, you can read and write
image::screen-shot-2019-09-19-at-5.53.40-pm.png["Diagram showing streaming and static data sources using Apache Spark and the TigerGraph JDBC server to perform two-way communication with TigerGraph services. The TigerGraph services shown are Graph Insights and applications, graph business intelligence, and graph visualization, all under the theme of Real-Time Graph Analytics."]

The Github Link to the JDBC Driver is https://github.com/tigergraph/ecosys/tree/master/tools/etl/tg-jdbc-driver +

The README file there provides more details.

[WARNING]
Expand All @@ -20,10 +21,296 @@ If you use spark job to connect TigerGraph via JDBC, we recommend your concurren
This limits the concurrent JDBC connections to 40.
----
—num-executors 2 /* 2 executors per job */
—executor-cores 2 /* 1 executor take 2 cores */
-num-executors 2 /* 2 executors per job */
-executor-cores 2 /* 1 executor take 2 cores */
----
====

== Load from a Data Lake via Spark

=== 1) Write a Spark `DataFrameWriter`

Write a Spark DataFrameWriter function that will write data to CSV files, following the example below.

NOTE: You need to choose names for a GSQL loading job and its data files that you will be using in Step 2.



.Example: `DataFrameWriter` as "df"
[source, gsql]
df.write.mode("overwrite").format("jdbc").options(
Map(
"driver" -> "com.tigergraph.jdbc.Driver",
"url" -> "jdbc:tg:http://host:port",
"username" -> "tigergraph",
"password" -> "tigergraph",
"token" -> "token",
"graph" -> "Social", // graph name
"dbtable" -> "job load_Social", // loading job name, "job" is required
"filename" -> "file1", // filename defined in the loading job
"sep" -> ",", // separator between columns
"eol" -> "\n", // end of line
"batchsize" -> "10000"))
.save()

The `sep` and `eol` will be used to convert the `DataFrame` to delimited data and that will be sent to the filename defined by the loading job.

.Example: DataFrame
[source, gsql]
> DataFrame
+-----+---+------+
| _c0|_c1| _c2|
+-----+---+------+
| Tom| 23| male|
|Jerry| 45| male|
|Jenny| 33|female|
|Lizzy| 19|female|
+-----+---+------+
.Example: Delimited Data
[source, gsql]
Tom,23,male
Jerry,45,male
Jenny,33,female
Lizzy,19,female
=== 2) Create a Loading Job
Write a GSQL loading job, using the job and file names that you used in step 1, to map data from the CSV file(s) to TigerGraph vertices and edges.
.Example:
[source, gsql]
CREATE LOADING JOB load_Social FOR GRAPH Social {
DEFINE FILENAME file1;
DEFINE FILENAME file2;
LOAD file1 TO VERTEX Person VALUES ($0, $1, $2);
LOAD file2 TO EDGE Friendship VALUES ($0, $1);
}
The loading job above, `load_Social` loads the 1st, 2nd, and 3rd columns of source file, `file1`, to the 1st, 2nd, and 3rd attributes of the vertex `Person`.
//Alternatively, loading jobs can be run as post requests.
//.Example: Post Request to TigerGraph
//[source, gsql]
//http://host:port/restpp/ddl/Social?tag=load_Social&filename=file1
//--data <delimited_data>
See the pages xref:gsql-ref:ddl-and-loading:creating-a-loading-job.adoc[], xref:gsql-ref:ddl-and-loading:running-a-loading-job.adoc[] and xref:tigergraph-server:API:built-in-endpoints.adoc#_loading_jobs[Loading Jobs as a REST Endpoint] for more information about loading jobs in TigerGraph.
== Advanced Usages with Spark
=== Enable SSL with Spark
Add the following options to your scala script:
[source, gsql]
"trustStore" -> "trust.jks",
"trustStorePassword" -> "password",
"trustStoreType" -> "JKS",
And run it with `--files` option like this:
[source, gsql]
/path/to/spark/bin/spark-shell --jars /path/to/tigergraph-jdbc-driver-${VERSION}.jar --files /path/to/trust.jks -i test.scala
The `--files` should be provided the JKS file path, while the `"trustStore" -> "trust.jks"` should be the JKS filename only.
=== Load Statistics
[WARNING]
====
TigerGraph JDBC connector is streaming in data via REST endpoints.
No data throttle mechanism is in place yet.
When the incoming concurrent JDBC connection number exceeds the configured hardware capacity limit, the overload may cause the system to stop responding.
If you use a Spark job to connect TigerGraph via JDBC, we recommend your concurrent Spark loading jobs be capped at 10 with the following per job configuration.
This limits the concurrent JDBC connections to 40.
====
To load data from files:
.Example: 2 executors per job where each executor takes 2 cores.
[source, gsql]
/path/to/spark/bin/spark-shell --jars /path/to/tigergraph-jdbc-driver-${VERSION}.jar -—num-executors 2 —-executor-cores 2 -i test.scala
.Example: Invoke loading job
[source, gsql]
val df = sc.textFile("/path/to/your_file", 100).toDF()
df.write.mode("append").format("jdbc").options(
Map(
"driver" -> "com.tigergraph.jdbc.Driver",
"url" -> "jdbc:tg:http://127.0.0.1:14240",
"username" -> "tigergraph",
"password" -> "tigergraph",
"graph" -> "ldbc_snb",
"dbtable" -> "job load_ldbc_snb", // loading job name
"filename" -> "v_comment_file", // filename defined in the loading job
"sep" -> "|", // separator between columns
"eol" -> "\n", // End Of Line
"batchsize" -> "10000",
"debug" -> "0",
"logFilePattern" -> "/tmp/jdbc.log")).save()
*If your TG version is 3.9.0 or higher, please use the following new features:*
* `jobid`: Since the Spark loading is sending data in multiple batches, it's hard to collect the loading stats of all the batches.
The `jobid` is a new connection property that helps aggregate the stats of each batch loading, so the overall loading stats can be easily acquired.
* `max_num_error`: The threshold of the error objects count within the `jobid`.
The loading job will be aborted when reaching the limit. `jobid` is required.
* `max_percent_error`: Is the threshold of the error objects percentage within the `jobid`.
The loading job will be aborted when reaching the limit.
`jobid` is required.
NOTE: For a more detailed example, please refer to the https://github.com/tigergraph/ecosys/blob/master/tools/etl/tg-jdbc-driver/tg-jdbc-examples/src/main/java/com/tigergraph/jdbc/examples/SparkLoadingJob.scala#L55-L57[GitHub link].
*For the `"batchsize"` option:*
* *If it is set too small*, lots of time will be spent on setting up connections.
* *If it is too large*, the http payload may exceed limit (the default TigerGraph Rest++ maximum payload size is 128MB). Furthermore, a large `"batchsize"` may result in high jitter performance.
To bypass the disk IO limitation, it is better to put the raw data file on a different disk other than the one used by TigerGraph.
== Configuration Options with Spark
[cols="4"]
|===
| Property Name |Default| Meaning |Required
| `driver` | (none) | Fully qualified domain name(FQCN) of the JDBC driver: `com.tigergraph.jdbc.Driver`. | Yes
| `url` | (none) |The JDBC URL to connect to: `jdbc:tg:http(s)://ip:port`, this port is the one used by GraphStudio.| Yes
| `graph` | (none)| The graph name.| Yes
| `version` | 3.9.0 |The TigerGraph version. |Yes
| `username` | tigergraph | TigerGraph username. | If xref:tigergraph-server:user-access:enabling-user-authentication.adoc[REST++ authentication] is enabled, a username/password or token is required.
| `password` | tigergraph | TigerGraph password. | If xref:tigergraph-server:user-access:enabling-user-authentication.adoc[REST++ authentication] is enabled, a username/password or token is required.
| `token` | (none) | A token used to authenticate RESTPP requests. Request a token| If xref:tigergraph-server:user-access:enabling-user-authentication.adoc[REST++ authentication] is enabled, a username/password or token is required.
| `jobid` (TG version >= 3.9.0) | (none) | A unique ID for tracing aggregated loading statistics. | No
| `max_num_error` (TG version >= 3.9.0) | (none) | The threshold of the error objects count within the `jobid`. The loading job will be aborted when reaching the limit. `jobid` is required. | No
| `max_percent_error` (TG version >= 3.9.0) | (none) |The threshold of the error objects percentage within the `jobid`. The loading job will be aborted when reaching the limit. `jobid` is required. | No
| `filename` | (none) | The filename defined in the loading job. | Yes
| `sep` | (none) | Column separator. E.g., ,. | Yes
| `eol` | (none) | Line separator. E.g., \n. | Yes
| `dbtable` | (none) | The specification of the operation of the form: `operation_type` `operation_object`. For loading job: `job JOB_NAME`; E.g. for querying loading statistics: `jobid JOB_ID`. | Yes
| `batchsize` | 1000 | Maximum number of lines per POST request. |Yes
| `debug` | 2 | Log level:0 → ERROR, 1 → WARN, 2 → INFO, 3 → DEBUG | Yes
| `logFilePattern` | (none) | The log file name pattern, e.g., "/tmp/tigergraph-jdbc-driver.log", the log will be printed to stderr when it is not given | all
| `ip_list` | (none) |A string that contains IP addresses of TigerGraph nodes separated by a comma, which can be used for load balancing. E.g., `192.168.0.50,192.168.0.51,192.168.0.52` | No
| `trustStore` | (none) | Filename of the truststore which stores the SSL certificate. Please add `--files /path/to/trust.jks` when submitting the Spark job. | No
| `trustStorePassword` | (none) | Password of the truststore. | No
| `trustStoreType` | (none) | Truststore type, e.g., jks. | No
| `sslHostnameVerification` | true | Whether to verify the host name in the url matches the host name in the certificate. | No
| `queryTimeout` | RESTPP.Factory.DefaultQueryTimeoutSec| The timeout (s) for REST++ request. | No
| `connectTimeout` | 30 | The connect timeout (s) for HTTP client. | No
|===
== Specific Usages for a Spark DataFrame in TigerGraph
=== Bulk Load
To read the full delta table as a Spark DataFrame and a bulk load to TigerGraph:
[source, gsql]
val df = spark.read.format("delta").load("/tmp/delta-table")
df.write.mode("overwrite").format("jdbc").options(
Map(
"driver" -> "com.tigergraph.jdbc.Driver",
"url" -> "jdbc:tg:http://host:port",
...))
.save()
=== Capture Changes in Batch Queries
. Enable the change data feed option on the Delta table:
+
[source, gsql]
ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
. Capture the changes into a DataFrame:
+
.Example 1: Version is type `int` or `long`:
[source, gsql]
val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
+
.Example 2: Timestamps as formatted as `timestamp`:
[source, gsql]
val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
+
.Example 3: Providing only the starting Version/timestamp:
[source, gsql]
val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
+
.Example 4: Path to table:
[source, gsql]
val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
. Filter the changes:
+
The DataFrame containing the changes of the Delta table has https://docs.databricks.com/en/delta/delta-change-data-feed.html#what-is-the-schema-for-the-change-data-feed[3 additional columns]:
+
[cols="3"]
|===
|Column name |Type | Values
| `_change_type` | String | insert, update_preimage , update_postimage, delete
| `_commit_version` | Long | The Delta log or table version containing the change.
| `_commit_timestamp` | Timestamp | The timestamp associated when the commit was created.
|===
+
A TigerGraph GSQL loading job only supports an insertion or an updating type, so filtering of the result is needed:
+
[source, gsql]
df.filter($"_change_type" === "insert" || $"_change_type" === update_postimage)
. Select original data columns:
+
[source, gsql]
df.select("_c0", "_c1", "_c2")
. Write the DataFrame to TigerGraph:
+
[source, gsql]
df.write.mode("overwrite").format("jdbc").options(
Map(
"driver" -> "com.tigergraph.jdbc.Driver",
"url" -> "jdbc:tg:http://host:port",
...))
.save()
=== Full Example
Below is a full example of the previous steps with more options.
.Capture changes + filter the insertion and updating:
[source, gsql]
val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
.filter($"_change_type" === "insert" || $"_change_type" === update_postimage)
Note that the TigerGraph JDBC Driver has more use cases than just as a Spark Connection. You can use TigerGraph's JDBC Driver for your Java and Python applications. This is an open source project.
.Now write the changes from the DataFrame to TigerGraph
[source, gsql]
df.write.mode("overwrite").format("jdbc").options(
Map(
"driver" -> "com.tigergraph.jdbc.Driver",
"url" -> "jdbc:tg:http://host:port",
"username" -> "tigergraph",
"password" -> "tigergraph",
"token" -> "token",
"graph" -> "Social", // graph name
"dbtable" -> "job load_Social", // loading job name, "job" is required
"filename" -> "file1", // filename defined in the loading job
"sep" -> ",", // separator between columns
"eol" -> "\n", // end of line
"batchsize" -> "10000"))
.save()

0 comments on commit 726de7c

Please sign in to comment.