Skip to content

Commit

Permalink
Merge branch 'dev' into transfer-opt
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Sep 18, 2024
2 parents 33ff43c + 4c95a33 commit adeb687
Show file tree
Hide file tree
Showing 19 changed files with 482 additions and 204 deletions.
5 changes: 4 additions & 1 deletion scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def run_local(
**({'spark.shuffle.spill.compress': 'true'}),
**({'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'}),
**({'spark.executor.extraJavaOptions': '-XX:+UseG1GC'}),
**({'spark.driver.maxResultSize': '5g'}),
**({'spark.driver.maxResultSize': '0'}),
**({'spark.memory.offHeap.enabled': 'true'}),
**({'spark.memory.offHeap.size': '100g'}),
**({'spark.storage.memoryFraction': '0'}),
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
**spark_conf
Expand Down
15 changes: 8 additions & 7 deletions scripts/run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,24 @@ echo "start: " `date`
# --conf "spark.dynamicAllocation.enabled=true" \
# --conf "spark.dynamicAllocation.minExecutors=1" \
# --conf "spark.dynamicAllocation.maxExecutors=10" \

# --conf "spark.yarn.maximizeResourceAllocation=true" \
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
time spark-submit --master spark://finbench-large-00:7077 \
--class ldbc.finbench.datagen.LdbcDatagen \
--num-executors 2 \
--conf "spark.default.parallelism=640" \
--conf "spark.default.parallelism=800" \
--conf "spark.network.timeout=100000" \
--conf "spark.shuffle.compress=true" \
--conf "spark.shuffle.spill.compress=true" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.driver.memory=200g" \
--conf "spark.driver.maxResultSize=5g" \
--conf "spark.executor.memory=300g" \
--conf "spark.executor.memoryOverheadFactor=0.2" \
--conf "spark.driver.memory=100g" \
--conf "spark.driver.maxResultSize=0" \
--conf "spark.executor.memory=400g" \
--conf "spark.executor.memoryOverheadFactor=0.5" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 30 \
--scale-factor 100 \
--output-dir ${OUTPUT_DIR}

echo "End: " `date`
10 changes: 5 additions & 5 deletions scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ OUTPUT_DIR=out
# run locally with spark-submit command
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
# --conf "spark.storage.memoryFraction=0" \
time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 480g \
--conf "spark.default.parallelism=500" \
--conf "spark.shuffle.compress=true" \
--conf "spark.shuffle.spill.compress=true" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.memory.offHeap.enabled=true" \
--conf "spark.memory.offHeap.size=100g" \
--conf "spark.storage.memoryFraction=0" \
--conf "spark.driver.maxResultSize=0" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 10 \
--output-dir ${OUTPUT_DIR}
--scale-factor 1 \
--output-dir ${OUTPUT_DIR}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static void createCompanyGuaranteeCompany(RandomGeneratorFarm farm, Compa
toCompany, creationDate, 0, false,
"business associate", comment);
fromCompany.getGuaranteeSrc().add(companyGuaranteeCompany);
toCompany.getGuaranteeDst().add(companyGuaranteeCompany);
}

public Company getFromCompany() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static void createPersonGuaranteePerson(RandomGeneratorFarm farm, Person
PersonGuaranteePerson personGuaranteePerson =
new PersonGuaranteePerson(fromPerson, toPerson, creationDate, 0, false, relation, comment);
fromPerson.getGuaranteeSrc().add(personGuaranteePerson);
toPerson.getGuaranteeDst().add(personGuaranteePerson);
}

public Person getFromPerson() {
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,41 @@ public Transfer(Account fromAccount, Account toAccount, double amount, long crea
this.isExplicitlyDeleted = isExplicitlyDeleted;
}

public static void createTransferNew(RandomGeneratorFarm farm, Account from, Account to,
long multiplicityId) {
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
long creationDate =
Dictionaries.dates.randomAccountToAccountDate(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_DATE), from, to,
deleteDate);
boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted();
double amount =
farm.get(RandomGeneratorFarm.Aspect.TRANSFER_AMOUNT).nextDouble() * DatagenParams.transferMaxAmount;
Transfer transfer = new Transfer(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete);

// Set ordernum
String ordernum = Dictionaries.numbers.generateOrdernum(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_ORDERNUM));
transfer.setOrdernum(ordernum);

// Set comment
String comment =
Dictionaries.randomTexts.getUniformDistRandomTextForComments(
farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT));
transfer.setComment(comment);

// Set payType
String paytype =
Dictionaries.transferTypes.getUniformDistRandomText(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_PAYTYPE));
transfer.setPayType(paytype);

// Set goodsType
String goodsType =
Dictionaries.transferTypes.getUniformDistRandomText(
farm.get(RandomGeneratorFarm.Aspect.TRANSFER_GOODSTYPE));
transfer.setGoodsType(goodsType);

from.getTransferOuts().add(transfer);
}

public static Transfer createTransfer(RandomGeneratorFarm farm, Account from, Account to,
long multiplicityId) {
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/ldbc/finbench/datagen/entities/edges/Withdraw.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ public Withdraw(Account fromAccount, Account toAccount, double amount, long crea
this.comment = comment;
}

public static void createWithdrawNew(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) {
Random dateRand = farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE);
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
long creationDate = Dictionaries.dates.randomAccountToAccountDate(dateRand, from, to, deleteDate);
boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted();
double amount =
farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_AMOUNT).nextDouble() * DatagenParams.withdrawMaxAmount;
String comment =
Dictionaries.randomTexts.getUniformDistRandomTextForComments(
farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT));
Withdraw withdraw =
new Withdraw(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete, comment);
from.getWithdraws().add(withdraw);
}

public static Withdraw createWithdraw(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) {
Random dateRand = farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE);
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import ldbc.finbench.datagen.entities.edges.Transfer;
import ldbc.finbench.datagen.entities.edges.Withdraw;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.distribution.DegreeDistribution;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class AccountActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final DegreeDistribution multiplicityDist;
private final double partRatio;
private final Random randIndex;
private final Map<String, AtomicLong> multiplicityMap;

public AccountActivitiesEvent() {
this.partRatio = 1.0 / DatagenParams.transferShuffleTimes;
randomFarm = new RandomGeneratorFarm();
multiplicityDist = DatagenParams.getTransferMultiplicityDistribution();
multiplicityDist.initialize();
randIndex = new Random(DatagenParams.defaultSeed);
multiplicityMap = new ConcurrentHashMap<>();
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
multiplicityDist.reset(seed);
randIndex.setSeed(seed);
}

private LinkedList<Integer> getIndexList(int size) {
LinkedList<Integer> indexList = new LinkedList<>();
for (int i = 0; i < size; i++) {
indexList.add(i);
}
return indexList;
}

// Generation to parts will mess up the average degree(make it bigger than expected) caused by ceiling operations.
// Also, it will mess up the long tail range of powerlaw distribution of degrees caused by 1 rounded to 2.
// See the plot drawn by check_transfer.py for more details.
public List<Account> accountActivities(List<Account> accounts, List<Account> cards, int blockId) {
resetState(blockId);
Random pickAccountForWithdrawal = randomFarm.get(RandomGeneratorFarm.Aspect.ACCOUNT_WHETHER_WITHDRAW);


// scale to percentage
for (Account account : accounts) {
account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio));
account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio));
}
LinkedList<Integer> availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds
for (int i = 0; i < accounts.size(); i++) {
Account from = accounts.get(i);
// account transfer to other accounts
while (from.getAvailableOutDegree() != 0) {
int skippedCount = 0;
for (int j = 0; j < availableToAccountIds.size(); j++) {
int toIndex = availableToAccountIds.get(j);
Account to = accounts.get(toIndex);
if (toIndex == i || cannotTransfer(from, to)) {
skippedCount++;
continue;
}
long numTransfers = Math.min(multiplicityDist.nextDegree(),
Math.min(from.getAvailableOutDegree(), to.getAvailableInDegree()));
for (int mindex = 0; mindex < numTransfers; mindex++) {
Transfer.createTransferNew(randomFarm, from, to, mindex);
}
if (to.getAvailableInDegree() == 0) {
availableToAccountIds.remove(j);
j--;
}
if (from.getAvailableOutDegree() == 0) {
break;
}
}
if (skippedCount == availableToAccountIds.size()) {
System.out.println("[Transfer] All accounts skipped for " + from.getAccountId());
break; // end loop if all accounts are skipped
}
}
// account withdraw to cards
if (pickAccountForWithdrawal.nextDouble() < DatagenParams.accountWithdrawFraction) {
for (int count = 0; count < DatagenParams.maxWithdrawals; count++) {
Account to = cards.get(randIndex.nextInt(cards.size()));
if (cannotWithdraw(from, to)) {
continue;
}
Withdraw.createWithdrawNew(randomFarm, from, to, getMultiplicityIdAndInc(from, to));
}
}

}
return accounts;
}

// Transfer to self is not allowed
private boolean cannotTransfer(Account from, Account to) {
return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta
|| from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate()
|| from.equals(to) || from.getAvailableOutDegree() == 0 || to.getAvailableInDegree() == 0;
}

private boolean cannotWithdraw(Account from, Account to) {
return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta
|| from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate()
|| from.equals(to);
}

private long getMultiplicityIdAndInc(Account from, Account to) {
String key = from.getAccountId() + "-" + to.getAccountId();
AtomicLong atomicInt = multiplicityMap.computeIfAbsent(key, k -> new AtomicLong());
return atomicInt.getAndIncrement();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import ldbc.finbench.datagen.entities.edges.CompanyApplyLoan;
import ldbc.finbench.datagen.entities.edges.CompanyGuaranteeCompany;
import ldbc.finbench.datagen.entities.edges.CompanyOwnAccount;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.entities.nodes.Company;
import ldbc.finbench.datagen.entities.nodes.Loan;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.generation.generators.AccountGenerator;
import ldbc.finbench.datagen.generation.generators.LoanGenerator;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class CompanyActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random randIndex;

public CompanyActivitiesEvent() {
randomFarm = new RandomGeneratorFarm();
randIndex = new Random(DatagenParams.defaultSeed);
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
randIndex.setSeed(seed);
}

public List<Company> companyActivities(List<Company> companies, AccountGenerator accountGenerator,
LoanGenerator loanGenerator, int blockId) {
resetState(blockId);
accountGenerator.resetState(blockId);

Random numAccRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_ACCOUNTS_PER_COMPANY);

Random pickCompanyGuaRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_COMPANY_GUARANTEE);
Random numGuaranteesRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_GUARANTEES_PER_COMPANY);

Random pickCompanyLoanRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_COMPANY_FOR_LOAN);
Random numLoansRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_LOANS_PER_COMPANY);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.COMPANY_APPLY_LOAN_DATE);

for (Company from : companies) {
// register accounts
int numAccounts = numAccRand.nextInt(DatagenParams.maxAccountsPerOwner);
for (int i = 0; i < Math.max(1, numAccounts); i++) {
Account account = accountGenerator.generateAccount(from.getCreationDate(), "company", blockId);
CompanyOwnAccount.createCompanyOwnAccount(randomFarm, from, account, account.getCreationDate());
}
// guarantee other companies
if (pickCompanyGuaRand.nextDouble() < DatagenParams.companyGuaranteeFraction) {
int numGuarantees = numGuaranteesRand.nextInt(DatagenParams.maxTargetsToGuarantee);
for (int i = 0; i < Math.max(1, numGuarantees); i++) {
Company to = companies.get(randIndex.nextInt(companies.size()));
if (from.canGuarantee(to)) {
CompanyGuaranteeCompany.createCompanyGuaranteeCompany(randomFarm, from, to);
}
}
}
// apply loans
if (pickCompanyLoanRand.nextDouble() < DatagenParams.companyLoanFraction) {
int numLoans = numLoansRand.nextInt(DatagenParams.maxLoans);
for (int i = 0; i < Math.max(1, numLoans); i++) {
long applyDate = Dictionaries.dates.randomCompanyToLoanDate(dateRand, from);
Loan to = loanGenerator.generateLoan(applyDate, "company", blockId);
CompanyApplyLoan.createCompanyApplyLoan(randomFarm, applyDate, from, to);
}
}
}

return companies;
}
}
Loading

0 comments on commit adeb687

Please sign in to comment.