Skip to content

Commit

Permalink
remove count
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Jun 24, 2024
1 parent 9d5b4ca commit 9f9395d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) extends Wri
log.info(s"[Simulation] Company RDD partitions: ${companyRdd.getNumPartitions}")
log.info(s"[Simulation] Medium RDD partitions: ${mediumRdd.getNumPartitions}")

// =========================================
// Person and company related activities
// =========================================
val personWithAccounts = activityGenerator.personRegisterEvent(personRdd) // simulate person register event
log.info(s"[Simulation] personWithAccounts partitions: ${personWithAccounts.getNumPartitions}")
val companyWithAccounts = activityGenerator.companyRegisterEvent(companyRdd) // simulate company register event
Expand All @@ -64,14 +62,12 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) extends Wri
// simulate person apply loans event and company apply loans event
val personWithAccGuaLoan = activityGenerator.personLoanEvent(personWithAccGua).cache()
val companyWithAccGuaLoan = activityGenerator.companyLoanEvent(companyWitAccGua).cache()
assert(personWithAccGuaLoan.count() == personRdd.count())
assert(companyWithAccGuaLoan.count() == companyRdd.count())
// assert(personWithAccGuaLoan.count() == personRdd.count())
// assert(companyWithAccGuaLoan.count() == companyRdd.count())
log.info(s"[Simulation] personWithAccGuaLoan partitions: ${personWithAccGuaLoan.getNumPartitions}")
log.info(s"[Simulation] companyWithAccGuaLoan partitions: ${companyWithAccGuaLoan.getNumPartitions}")

// =========================================
// Account related activities
// =========================================
val accountRdd = mergeAccounts(personWithAccounts, companyWithAccounts) // merge
log.info(s"[Simulation] Account RDD partitions: ${accountRdd.getNumPartitions}")
val signInRdd = activityGenerator.signInEvent(mediumRdd, accountRdd) // simulate signIn
Expand All @@ -91,9 +87,7 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) extends Wri
s"repays RDD partitions: ${repaysRdd.getNumPartitions}, " +
s"loanTrasfers RDD partitions: ${loanTrasfersRdd.getNumPartitions}")

// =========================================
// Serialize
// =========================================
val allFutures = activitySerializer.writePersonWithActivities(personWithAccGuaLoan) ++
activitySerializer.writeCompanyWithActivities(companyWithAccGuaLoan) ++
activitySerializer.writeMediumWithActivities(mediumRdd, signInRdd) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ActivityGenerator()(implicit spark: SparkSession) extends Serializable wit
val personEitherRDD: RDD[EitherPersonOrCompany] = personRDD.map(person => Left(person))
val companyEitherRDD: RDD[EitherPersonOrCompany] = companyRDD.map(company => Right(company))
val mergedEither = personEitherRDD.union(companyEitherRDD).collect().toList
// val mergedEither = personEitherRDD.union(companyEitherRDD)

// TODO: optimize the Spark process when large scale
investedCompanyRDD.flatMap(investedCompany => {
Expand Down
8 changes: 7 additions & 1 deletion todos.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,10 @@
⁃ 打断点调试的方法?【问】@py
⁃ RDD、job命名区分 @qsp
⁃ futures写法是否确实能并行(看Spark UI)
⁃ 与SNB datagen对比
⁃ 与SNB datagen对比


## tricks

- 开启history sever: https://stackoverflow.com/questions/28675460/how-to-keep-the-spark-web-ui-alive
-

0 comments on commit 9f9395d

Please sign in to comment.