Skip to content

Commit

Permalink
Merge branch 'dev' into dev_qsp
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp authored Jul 18, 2024
2 parents 3101261 + 6f902f1 commit 1c7a74a
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 59 deletions.
7 changes: 7 additions & 0 deletions scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ def run_local(

final_spark_conf = {
**({'spark.default.parallelism': str(parallelism)} if parallelism else {}),
**({'spark.shuffle.compress': 'true'}),
**({'spark.shuffle.spill.compress': 'true'}),
**({'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'}),
**({'spark.executor.extraJavaOptions': '-XX:+UseG1GC'}),
**({'spark.storage.memoryMapThreshold': '512m'}),
**({'spark.sql.inMemoryColumnarStorage.compressed': 'true'}),
**({'spark.sql.inMemoryColumnarStorage.batchSize': '1000000'}),
**spark_conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
PersonRaw(p.getPersonId, p.getCreationDate, p.getPersonName, p.isBlocked,
p.getGender, p.getBirthday, p.getCountryName, p.getCityName)
}
spark.createDataFrame(rawPersons).write.format(sink.format.toString).options(options)
.save((pathPrefix / "person").toString)
rawPersons.coalesce(1).saveAsTextFile((pathPrefix / "person").toString)
},
SparkUI.jobAsync("Write Person", "Write Person own account") {
val rawPersonOwnAccount = self.flatMap { p =>
Expand All @@ -37,8 +36,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
poa.getCreationDate, poa.getDeletionDate, poa.isExplicitlyDeleted)
}
}
spark.createDataFrame(rawPersonOwnAccount).write.format(sink.format.toString).options(options)
.save((pathPrefix / "personOwnAccount").toString)
rawPersonOwnAccount.coalesce(1).saveAsTextFile((pathPrefix / "personOwnAccount").toString)
},
SparkUI.jobAsync("Write Person", "Write Person guarantee") {
val rawPersonGuarantee = self.flatMap { p =>
Expand All @@ -49,8 +47,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
pgp.getCreationDate, pgp.getRelationship)
}
}
spark.createDataFrame(rawPersonGuarantee).write.format(sink.format.toString).options(options)
.save((pathPrefix / "personGuarantee").toString)
rawPersonGuarantee.coalesce(1).saveAsTextFile((pathPrefix / "personGuarantee").toString)
},
SparkUI.jobAsync("Write Person", "Write Person apply loan") {
val rawPersonLoan = self.flatMap { p =>
Expand All @@ -61,8 +58,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
pal.getCreationDate, pal.getOrganization)
}
}
spark.createDataFrame(rawPersonLoan).write.format(sink.format.toString).options(options)
.save((pathPrefix / "personApplyLoan").toString)
rawPersonLoan.coalesce(1).saveAsTextFile((pathPrefix / "personApplyLoan").toString)
}
)
futures
Expand All @@ -76,8 +72,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
CompanyRaw(c.getCompanyId, c.getCreationDate, c.getCompanyName, c.isBlocked,
c.getCountryName, c.getCityName, c.getBusiness, c.getDescription, c.getUrl)
}
spark.createDataFrame(rawCompanies).write.format(sink.format.toString).options(options)
.save((pathPrefix / "company").toString)
rawCompanies.coalesce(1).saveAsTextFile((pathPrefix / "company").toString)
},
SparkUI.jobAsync("Write Company", "Write Company own account") {
val rawCompanyOwnAccount = self.flatMap { c =>
Expand All @@ -86,8 +81,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
coa.getCreationDate, coa.getDeletionDate, coa.isExplicitlyDeleted)
}
}
spark.createDataFrame(rawCompanyOwnAccount).write.format(sink.format.toString).options(options)
.save((pathPrefix / "companyOwnAccount").toString)
rawCompanyOwnAccount.coalesce(1).saveAsTextFile((pathPrefix / "companyOwnAccount").toString)
},
SparkUI.jobAsync("Write Company", "Write Company guarantee") {
val rawCompanyGuarantee = self.flatMap { c =>
Expand All @@ -98,8 +92,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
cgc.getCreationDate, cgc.getRelationship)
}
}
spark.createDataFrame(rawCompanyGuarantee).write.format(sink.format.toString).options(options)
.save((pathPrefix / "companyGuarantee").toString)
rawCompanyGuarantee.coalesce(1).saveAsTextFile((pathPrefix / "companyGuarantee").toString)
},
SparkUI.jobAsync("Write Company", "Write Company apply loan") {
val rawCompanyLoan = self.flatMap { c =>
Expand All @@ -110,8 +103,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
formattedDouble(cal.getLoan.getLoanAmount), cal.getCreationDate, cal.getOrganization)
}
}
spark.createDataFrame(rawCompanyLoan).write.format(sink.format.toString).options(options)
.save((pathPrefix / "companyApplyLoan").toString)
rawCompanyLoan.coalesce(1).saveAsTextFile((pathPrefix / "companyApplyLoan").toString)
}
)
futures
Expand All @@ -125,15 +117,13 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
MediumRaw(m.getMediumId, m.getCreationDate, m.getMediumName, m.isBlocked,
m.getLastLogin, m.getRiskLevel)
}
spark.createDataFrame(rawMedium).write.format(sink.format.toString).options(options)
.save((pathPrefix / "medium").toString)
rawMedium.coalesce(1).saveAsTextFile((pathPrefix / "medium").toString)

val rawSignIn = signIns.map { si: SignIn =>
SignInRaw(si.getMedium.getMediumId, si.getAccount.getAccountId,
si.getMultiplicityId, si.getCreationDate, si.getDeletionDate, si.isExplicitlyDeleted, si.getLocation)
}
spark.createDataFrame(rawSignIn).write.format(sink.format.toString).options(options)
.save((pathPrefix / "signIn").toString)
rawSignIn.coalesce(1).saveAsTextFile((pathPrefix / "signIn").toString)
},
)
futures
Expand All @@ -148,17 +138,15 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
a.getLastLoginTime, a.getAccountLevel, a.getMaxInDegree, a.getMaxOutDegree, a.isExplicitlyDeleted,
a.getOwnerType.toString)
}
spark.createDataFrame(rawAccount).write.format(sink.format.toString).options(options)
.save((pathPrefix / "account").toString)
rawAccount.coalesce(1).saveAsTextFile((pathPrefix / "account").toString)

val rawTransfer = transfers.map { t =>
TransferRaw(t.getFromAccount.getAccountId, t.getToAccount.getAccountId,
t.getMultiplicityId, t.getCreationDate, t.getDeletionDate, formattedDouble(t.getAmount),
t.isExplicitlyDeleted,
t.getOrdernum, t.getComment, t.getPayType, t.getGoodsType)
}
spark.createDataFrame(rawTransfer).write.format(sink.format.toString).options(options)
.save((pathPrefix / "transfer").toString)
rawTransfer.coalesce(1).saveAsTextFile((pathPrefix / "transfer").toString)
}
)
futures
Expand All @@ -172,8 +160,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
w.getFromAccount.getType, w.getToAccount.getType, w.getMultiplicityId, w.getCreationDate,
w.getDeletionDate, formattedDouble(w.getAmount), w.isExplicitlyDeleted)
}
spark.createDataFrame(rawWithdraw).write.format(sink.format.toString).options(options)
.save((pathPrefix / "withdraw").toString)
rawWithdraw.coalesce(1).saveAsTextFile((pathPrefix / "withdraw").toString)
}
)
futures
Expand All @@ -183,18 +170,18 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se

val futures = Seq(
SparkUI.jobAsync("Write invest", "Write Person Invest") {
val personInvest = self.filter(_.isLeft).map(_.left.get)
spark.createDataFrame(personInvest.map { pic =>
val personInvest = self.filter(_.isLeft).map(_.left.get).map { pic =>
PersonInvestCompanyRaw(pic.getPerson.getPersonId, pic.getCompany.getCompanyId,
pic.getCreationDate, pic.getRatio)
}).write.format(sink.format.toString).options(options).save((pathPrefix / "personInvest").toString)
}
personInvest.coalesce(1).saveAsTextFile((pathPrefix / "personInvest").toString)
},
SparkUI.jobAsync("Write invest", "Write Company Invest") {
val companyInvest = self.filter(_.isRight).map(_.right.get)
spark.createDataFrame(companyInvest.map { cic =>
val companyInvest = self.filter(_.isRight).map(_.right.get).map { cic =>
CompanyInvestCompanyRaw(cic.getFromCompany.getCompanyId,
cic.getToCompany.getCompanyId, cic.getCreationDate, cic.getRatio)
}).write.format(sink.format.toString).options(options).save((pathPrefix / "companyInvest").toString)
}
companyInvest.coalesce(1).saveAsTextFile((pathPrefix / "companyInvest").toString)
}
)
futures
Expand All @@ -209,30 +196,26 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) extends Se
LoanRaw(l.getLoanId, l.getCreationDate, formattedDouble(l.getLoanAmount), formattedDouble(l.getBalance),
l.getUsage, f"${l.getInterestRate}%.3f")
}
spark.createDataFrame(rawLoan).write.format(sink.format.toString).options(options)
.save((pathPrefix / "loan").toString)
rawLoan.coalesce(1).saveAsTextFile((pathPrefix / "loan").toString)

val rawDeposit = deposits.map { d: Deposit =>
DepositRaw(d.getLoan.getLoanId, d.getAccount.getAccountId,
d.getCreationDate, d.getDeletionDate, formattedDouble(d.getAmount), d.isExplicitlyDeleted)
}
spark.createDataFrame(rawDeposit).write.format(sink.format.toString).options(options)
.save((pathPrefix / "deposit").toString)
}.map(_.toString)
rawDeposit.saveAsTextFile((pathPrefix / "deposit").toString)

val rawRepay = repays.map { r: Repay =>
RepayRaw(r.getAccount.getAccountId, r.getLoan.getLoanId,
r.getCreationDate, r.getDeletionDate, formattedDouble(r.getAmount), r.isExplicitlyDeleted)
}
spark.createDataFrame(rawRepay).write.format(sink.format.toString).options(options)
.save((pathPrefix / "repay").toString)
rawRepay.coalesce(1).saveAsTextFile((pathPrefix / "repay").toString)

val rawLoanTransfer = loantransfers.map { t: Transfer =>
TransferRaw(t.getFromAccount.getAccountId, t.getToAccount.getAccountId, t.getMultiplicityId, t.getCreationDate,
t.getDeletionDate, formattedDouble(t.getAmount), t.isExplicitlyDeleted, t.getOrdernum, t.getComment,
t.getPayType, t.getGoodsType)
}
spark.createDataFrame(rawLoanTransfer).write.format(sink.format.toString).options(options)
.save((pathPrefix / "loantransfer").toString)
rawLoanTransfer.coalesce(1).saveAsTextFile((pathPrefix / "loantransfer").toString)
}
)
futures
Expand Down
Loading

0 comments on commit 1c7a74a

Please sign in to comment.