Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark factor generator #79

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
target/
.idea/
.vscode/
.metals/

# local generated data
out*/
Expand All @@ -16,4 +17,4 @@ tune.log

sf*/
sf*.tar
sf*.tar.gz
sf*.tar.gz
24 changes: 17 additions & 7 deletions scripts/paramgen/parameter_curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@
import os
import codecs
from datetime import date
from glob import glob

THRESH_HOLD = 0
TRUNCATION_LIMIT = 10000

def process_csv(file_path):
df = pd.read_csv(file_path, delimiter='|')
return df
all_files = glob(file_path + '/*.csv')

df_list = []

for filename in all_files:
df = pd.read_csv(filename, delimiter='|')
df_list.append(df)

combined_df = pd.concat(df_list, ignore_index=True)

return combined_df


class CSVSerializer:
Expand Down Expand Up @@ -118,11 +128,11 @@ def handleTimeDurationParam(timeParam):

def main():

loan_account_path = '../../out/factor_table/loan_account_list.csv'
account_account_path = '../../out/factor_table/account_items.csv'
account_amount_path = '../../out/factor_table/amount.csv'
amount_bucket_path = '../../out/factor_table/amount_bucket.csv'
time_bucket_path = '../../out/factor_table/month.csv'
loan_account_path = '../../out/factor_table/loan_account_list'
account_account_path = '../../out/factor_table/account_items'
account_amount_path = '../../out/factor_table/amount'
amount_bucket_path = '../../out/factor_table/amount_bucket'
time_bucket_path = '../../out/factor_table/month'
output_path = '../../out/substitute_parameters/'


Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object LdbcDatagen extends SparkApp with Logging {
format: String = "csv",
formatOptions: Map[String, String] = Map.empty,
epochMillis: Boolean = false,
generateFactors: Boolean = false,
generateFactors: Boolean = true,
factorFormat: String = "parquet"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ldbc.finbench.datagen.factors

import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.lit


object AccountItemsGenerator {
def generateAccountItems(implicit spark: SparkSession): Unit = {
import spark.implicits._

val accountRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.load("./out/raw/account/*.csv")

val transferRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.load("./out/raw/transfer/*.csv")

val withdrawRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.load("./out/raw/withdraw/*.csv")

val combinedRDD = transferRDD.select($"fromId", $"toId", $"amount".cast("double"))
.union(withdrawRDD.select($"fromId", $"toId", $"amount".cast("double")))

val maxAmountRDD = combinedRDD.groupBy($"fromId", $"toId")
.agg(max($"amount").alias("maxAmount"))

val accountItemsRDD = maxAmountRDD.groupBy($"fromId")
.agg(F.collect_list(F.array($"toId", $"maxAmount")).alias("items"))
.select($"fromId".alias("account_id"), $"items")
.sort($"account_id")

val transformedAccountItemsRDD = accountItemsRDD.withColumn(
"items",
F.expr("transform(items, array -> concat('[', concat_ws(',', array), ']'))")
).withColumn(
"items",
F.concat_ws(",", $"items")
).withColumn(
"items",
F.concat(lit("["), $"items", lit("]"))
)

transformedAccountItemsRDD
.coalesce(1)
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save("./out/factor_table/account_items")
}
}
223 changes: 155 additions & 68 deletions src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
package ldbc.finbench.datagen.factors

import ldbc.finbench.datagen.util.DatagenStage
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, countDistinct, row_number, var_pop}
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.functions.first
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.array
import org.apache.spark.sql.functions.coalesce
import org.apache.spark.sql.functions.array_join
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.format_string
import org.apache.spark.sql.types.{ArrayType, StringType}
import org.graphframes.GraphFrame
import org.slf4j.{Logger, LoggerFactory}
import scopt.OptionParser
import shapeless.lens

import scala.util.matching.Regex
import ldbc.finbench.datagen.factors.AccountItemsGenerator

object FactorGenerationStage extends DatagenStage {

Expand Down Expand Up @@ -69,82 +85,153 @@ object FactorGenerationStage extends DatagenStage {
}

def parameterCuration(implicit spark: SparkSession) = {
import spark.implicits._

val accountDf = spark.read.format("csv")
// AccountItemsGenerator.generateAccountItems
val accountRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.load("./out/account/*.csv")
// .toDF("_id","createTime","deleteTime","isBlocked","type","inDegree","OutDegree","isExplicitDeleted","Owner")
.load("./out/raw/account/*.csv")

val transferDf = spark.read.format("csv")
val transferRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.csv("./out/transfer/*.csv")
.toDF("src", "dst", "multiplicityI", "createTime", "deleteTime", "amount", "isExplicitDeleted")

val graph = GraphFrame(accountDf, transferDf)

val accountTransferAccountOut1Hops = graph.find("(a)-[e1]->(b)")
.filter("e1.src == a.id")
.groupBy("a.id")
.agg(countDistinct("b.id").alias("AccountTransferAccountOut1Hops"))

val accountTransferAccountOut2Hops = graph.find("(a)-[e1]->(b); (b)-[e2]->(c)")
.filter("e1.src == a.id and e2.src == b.id")
.groupBy("a.id")
.agg(countDistinct("c.id").alias("AccountTransferAccountOut2Hops"))

val accountTransferAccountOut3Hops = graph.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)")
.filter("e1.src == a.id and e2.src == b.id and e3.src == c.id")
.groupBy("a.id")
.agg(countDistinct("d.id").alias("AccountTransferAccountOut3Hops"))

val factorTable = accountTransferAccountOut1Hops
.join(accountTransferAccountOut2Hops.join(accountTransferAccountOut3Hops, "id"), "id")

val w1 = Window.orderBy(col("AccountTransferAccountOut1Hops")).rowsBetween(-100000, 0)
val variance1 = var_pop(col("AccountTransferAccountOut1Hops")).over(w1)
val df1 = accountTransferAccountOut1Hops
.withColumn("variance1", variance1)
.filter(col("AccountTransferAccountOut1Hops") > 1)
.withColumn("row_number1", row_number.over(Window.orderBy("variance1")))
.filter(col("row_number1") <= 100000)


val w2 = Window.orderBy(col("AccountTransferAccountOut2Hops")).rowsBetween(-10000, 0)
val variance2 = var_pop(col("AccountTransferAccountOut2Hops")).over(w2)
val df2 = df1
.join(accountTransferAccountOut2Hops, "id")
.withColumn("variance2", variance2)
.filter(col("AccountTransferAccountOut2Hops") > 10)
.withColumn("row_number2", row_number.over(Window.orderBy("variance2")))
.filter(col("row_number2") <= 10000)

val w3 = Window.orderBy(col("AccountTransferAccountOut3Hops")).rowsBetween(-1000, 0)
val variance3 = var_pop(col("AccountTransferAccountOut3Hops")).over(w3)
val paramTable = df2
.join(accountTransferAccountOut3Hops, "id")
.withColumn("variance3", variance3)
.filter(col("AccountTransferAccountOut3Hops") > 100)
.withColumn("row_number3", row_number.over(Window.orderBy("variance3")))
.filter(col("row_number3") <= 1000)
.select("id", "AccountTransferAccountOut1Hops", "AccountTransferAccountOut2Hops", "AccountTransferAccountOut3Hops")

factorTable.coalesce(1).write
.format("csv")
.option("header", value = true)
.load("./out/raw/transfer/*.csv")

val withdrawRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.load("./out/raw/withdraw/*.csv")

val combinedRDD = transferRDD.select($"fromId", $"toId", $"amount".cast("double"))
.union(withdrawRDD.select($"fromId", $"toId", $"amount".cast("double")))

val transactionsRDD = transferRDD.select(col("fromId"), col("createTime"))
.union(withdrawRDD.select(col("fromId"), col("createTime")))

val transactionsByMonthRDD = transactionsRDD
.withColumn("year_month", date_format((col("createTime") / 1000).cast("timestamp"), "yyyy-MM"))
.groupBy("fromId", "year_month")
.count()

val pivotRDD = transactionsByMonthRDD.groupBy("fromId")
.pivot("year_month")
.agg(first("count"))
.na.fill(0)
.withColumnRenamed("fromId", "account_id")

val maxAmountRDD = combinedRDD.groupBy($"fromId", $"toId")
.agg(max($"amount").alias("maxAmount"))

val accountItemsRDD = maxAmountRDD.groupBy($"fromId")
.agg(F.collect_list(F.array($"toId", $"maxAmount")).alias("items"))
.select($"fromId".alias("account_id"), $"items")
.sort($"account_id")

val transformedAccountItemsRDD = accountItemsRDD.withColumn(
"items",
F.expr("transform(items, array -> concat('[', concat_ws(',', array), ']'))")
).withColumn(
"items",
F.concat_ws(",", $"items")
).withColumn(
"items",
F.concat(lit("["), $"items", lit("]"))
)

val transactionsAmountRDD = transferRDD.select(col("fromId"), col("amount").cast("double"))
.union(withdrawRDD.select(col("fromId"), col("amount").cast("double")))

val buckets = Array(10000, 30000, 100000, 300000, 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 10000000)

val bucketRDD = transactionsAmountRDD.withColumn("bucket",
when(col("amount") <= buckets(0), buckets(0))
.when(col("amount") <= buckets(1), buckets(1))
.when(col("amount") <= buckets(2), buckets(2))
.when(col("amount") <= buckets(3), buckets(3))
.when(col("amount") <= buckets(4), buckets(4))
.when(col("amount") <= buckets(5), buckets(5))
.when(col("amount") <= buckets(6), buckets(6))
.when(col("amount") <= buckets(7), buckets(7))
.when(col("amount") <= buckets(8), buckets(8))
.when(col("amount") <= buckets(9), buckets(9))
.when(col("amount") <= buckets(10), buckets(10))
.when(col("amount") <= buckets(11), buckets(11))
.when(col("amount") <= buckets(12), buckets(12))
.otherwise(buckets(13))
)

val bucketCountsRDD = bucketRDD.groupBy("fromId")
.pivot("bucket", buckets.map(_.toString))
.count()
.na.fill(0)
.withColumnRenamed("fromId", "account_id")

val loanRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.load("./out/raw/loan/*.csv")

val depositRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", "true")
.option("delimiter", "|")
.option("encoding", "UTF-8")
.mode("overwrite")
.save("./out/factorTable/")
.load("./out/raw/deposit/*.csv")

val loanAccountListRDD = loanRDD.join(depositRDD, loanRDD("id") === depositRDD("loanId"), "left_outer")
.groupBy("id")
.agg(coalesce(collect_set("accountId"), array()).alias("account_list"))
.select(col("id").alias("loan_id"), concat(lit("["), array_join(col("account_list"), ","), lit("]")).alias("account_list"))

val transactionsSumRDD = transferRDD.select(col("toId"), col("amount").cast("double"))
.union(withdrawRDD.select(col("toId"), col("amount").cast("double")))

val amountSumRDD = transactionsSumRDD.groupBy("toId")
.agg(sum("amount").alias("amount"))
.withColumnRenamed("toId", "account_id")
.select(col("account_id"), format_string("%.2f", col("amount")).alias("amount"))

amountSumRDD
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save("./out/factor_table/amount")

paramTable.coalesce(1).write
.format("csv")
.option("header", value = true)
loanAccountListRDD
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save("./out/factor_table/loan_account_list")

transformedAccountItemsRDD
.coalesce(1)
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save("./out/factor_table/account_items")


pivotRDD
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save("./out/factor_table/month")

bucketCountsRDD
.write
.option("header", "true")
.option("delimiter", "|")
.option("encoding", "UTF-8")
.mode("overwrite")
.save("./out/paramTable/")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save("./out/factor_table/amount_bucket")


}
}
Loading
Loading