From 9f9395da7d325b4dc34a6500630b6c7df82dfdd9 Mon Sep 17 00:00:00 2001 From: qishipengqsp Date: Mon, 24 Jun 2024 11:08:23 +0800 Subject: [PATCH] remove count --- .../datagen/generation/ActivitySimulator.scala | 10 ++-------- .../generation/generators/ActivityGenerator.scala | 1 + todos.md | 8 +++++++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala index d5fe6e53..ebf68589 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala @@ -43,9 +43,7 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) extends Wri log.info(s"[Simulation] Company RDD partitions: ${companyRdd.getNumPartitions}") log.info(s"[Simulation] Medium RDD partitions: ${mediumRdd.getNumPartitions}") - // ========================================= // Person and company related activities - // ========================================= val personWithAccounts = activityGenerator.personRegisterEvent(personRdd) // simulate person register event log.info(s"[Simulation] personWithAccounts partitions: ${personWithAccounts.getNumPartitions}") val companyWithAccounts = activityGenerator.companyRegisterEvent(companyRdd) // simulate company register event @@ -64,14 +62,12 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) extends Wri // simulate person apply loans event and company apply loans event val personWithAccGuaLoan = activityGenerator.personLoanEvent(personWithAccGua).cache() val companyWithAccGuaLoan = activityGenerator.companyLoanEvent(companyWitAccGua).cache() - assert(personWithAccGuaLoan.count() == personRdd.count()) - assert(companyWithAccGuaLoan.count() == companyRdd.count()) +// assert(personWithAccGuaLoan.count() == personRdd.count()) +// assert(companyWithAccGuaLoan.count() == companyRdd.count()) log.info(s"[Simulation] personWithAccGuaLoan partitions: ${personWithAccGuaLoan.getNumPartitions}") log.info(s"[Simulation] companyWithAccGuaLoan partitions: ${companyWithAccGuaLoan.getNumPartitions}") - // ========================================= // Account related activities - // ========================================= val accountRdd = mergeAccounts(personWithAccounts, companyWithAccounts) // merge log.info(s"[Simulation] Account RDD partitions: ${accountRdd.getNumPartitions}") val signInRdd = activityGenerator.signInEvent(mediumRdd, accountRdd) // simulate signIn @@ -91,9 +87,7 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) extends Wri s"repays RDD partitions: ${repaysRdd.getNumPartitions}, " + s"loanTrasfers RDD partitions: ${loanTrasfersRdd.getNumPartitions}") - // ========================================= // Serialize - // ========================================= val allFutures = activitySerializer.writePersonWithActivities(personWithAccGuaLoan) ++ activitySerializer.writeCompanyWithActivities(companyWithAccGuaLoan) ++ activitySerializer.writeMediumWithActivities(mediumRdd, signInRdd) ++ diff --git a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala index 1285fddf..25f78f15 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala @@ -80,6 +80,7 @@ class ActivityGenerator()(implicit spark: SparkSession) extends Serializable wit val personEitherRDD: RDD[EitherPersonOrCompany] = personRDD.map(person => Left(person)) val companyEitherRDD: RDD[EitherPersonOrCompany] = companyRDD.map(company => Right(company)) val mergedEither = personEitherRDD.union(companyEitherRDD).collect().toList +// val mergedEither = personEitherRDD.union(companyEitherRDD) // TODO: optimize the Spark process when large scale investedCompanyRDD.flatMap(investedCompany => { diff --git a/todos.md b/todos.md index f52cfde1..5c1c7cfc 100644 --- a/todos.md +++ b/todos.md @@ -3,4 +3,10 @@ ⁃ 打断点调试的方法?【问】@py ⁃ RDD、job命名区分 @qsp ⁃ futures写法是否确实能并行(看Spark UI) -⁃ 与SNB datagen对比 \ No newline at end of file +⁃ 与SNB datagen对比 + + +## tricks + +- 开启history sever: https://stackoverflow.com/questions/28675460/how-to-keep-the-spark-web-ui-alive +- \ No newline at end of file