Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Sep 16, 2024
1 parent 22f0614 commit 4c95a33
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 2 deletions.
35 changes: 35 additions & 0 deletions src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,41 @@ public Transfer(Account fromAccount, Account toAccount, double amount, long crea
this.isExplicitlyDeleted = isExplicitlyDeleted;
}

public static void createTransferNew(RandomGeneratorFarm farm, Account from, Account to,
long multiplicityId) {
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
long creationDate =
Dictionaries.dates.randomAccountToAccountDate(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_DATE), from, to,
deleteDate);
boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted();
double amount =
farm.get(RandomGeneratorFarm.Aspect.TRANSFER_AMOUNT).nextDouble() * DatagenParams.transferMaxAmount;
Transfer transfer = new Transfer(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete);

// Set ordernum
String ordernum = Dictionaries.numbers.generateOrdernum(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_ORDERNUM));
transfer.setOrdernum(ordernum);

// Set comment
String comment =
Dictionaries.randomTexts.getUniformDistRandomTextForComments(
farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT));
transfer.setComment(comment);

// Set payType
String paytype =
Dictionaries.transferTypes.getUniformDistRandomText(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_PAYTYPE));
transfer.setPayType(paytype);

// Set goodsType
String goodsType =
Dictionaries.transferTypes.getUniformDistRandomText(
farm.get(RandomGeneratorFarm.Aspect.TRANSFER_GOODSTYPE));
transfer.setGoodsType(goodsType);

from.getTransferOuts().add(transfer);
}

public static Transfer createTransfer(RandomGeneratorFarm farm, Account from, Account to,
long multiplicityId) {
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/ldbc/finbench/datagen/entities/edges/Withdraw.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ public Withdraw(Account fromAccount, Account toAccount, double amount, long crea
this.comment = comment;
}

public static void createWithdrawNew(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) {
Random dateRand = farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE);
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
long creationDate = Dictionaries.dates.randomAccountToAccountDate(dateRand, from, to, deleteDate);
boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted();
double amount =
farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_AMOUNT).nextDouble() * DatagenParams.withdrawMaxAmount;
String comment =
Dictionaries.randomTexts.getUniformDistRandomTextForComments(
farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT));
Withdraw withdraw =
new Withdraw(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete, comment);
from.getWithdraws().add(withdraw);
}

public static Withdraw createWithdraw(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) {
Random dateRand = farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE);
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import ldbc.finbench.datagen.entities.edges.Transfer;
import ldbc.finbench.datagen.entities.edges.Withdraw;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.distribution.DegreeDistribution;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class AccountActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final DegreeDistribution multiplicityDist;
private final double partRatio;
private final Random randIndex;
private final Map<String, AtomicLong> multiplicityMap;

public AccountActivitiesEvent() {
this.partRatio = 1.0 / DatagenParams.transferShuffleTimes;
randomFarm = new RandomGeneratorFarm();
multiplicityDist = DatagenParams.getTransferMultiplicityDistribution();
multiplicityDist.initialize();
randIndex = new Random(DatagenParams.defaultSeed);
multiplicityMap = new ConcurrentHashMap<>();
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
multiplicityDist.reset(seed);
randIndex.setSeed(seed);
}

private LinkedList<Integer> getIndexList(int size) {
LinkedList<Integer> indexList = new LinkedList<>();
for (int i = 0; i < size; i++) {
indexList.add(i);
}
return indexList;
}

// Generation to parts will mess up the average degree(make it bigger than expected) caused by ceiling operations.
// Also, it will mess up the long tail range of powerlaw distribution of degrees caused by 1 rounded to 2.
// See the plot drawn by check_transfer.py for more details.
public List<Account> accountActivities(List<Account> accounts, List<Account> cards, int blockId) {
resetState(blockId);
Random pickAccountForWithdrawal = randomFarm.get(RandomGeneratorFarm.Aspect.ACCOUNT_WHETHER_WITHDRAW);


// scale to percentage
for (Account account : accounts) {
account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio));
account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio));
}
LinkedList<Integer> availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds
for (int i = 0; i < accounts.size(); i++) {
Account from = accounts.get(i);
// account transfer to other accounts
while (from.getAvailableOutDegree() != 0) {
int skippedCount = 0;
for (int j = 0; j < availableToAccountIds.size(); j++) {
int toIndex = availableToAccountIds.get(j);
Account to = accounts.get(toIndex);
if (toIndex == i || cannotTransfer(from, to)) {
skippedCount++;
continue;
}
long numTransfers = Math.min(multiplicityDist.nextDegree(),
Math.min(from.getAvailableOutDegree(), to.getAvailableInDegree()));
for (int mindex = 0; mindex < numTransfers; mindex++) {
Transfer.createTransferNew(randomFarm, from, to, mindex);
}
if (to.getAvailableInDegree() == 0) {
availableToAccountIds.remove(j);
j--;
}
if (from.getAvailableOutDegree() == 0) {
break;
}
}
if (skippedCount == availableToAccountIds.size()) {
System.out.println("[Transfer] All accounts skipped for " + from.getAccountId());
break; // end loop if all accounts are skipped
}
}
// account withdraw to cards
if (pickAccountForWithdrawal.nextDouble() < DatagenParams.accountWithdrawFraction) {
for (int count = 0; count < DatagenParams.maxWithdrawals; count++) {
Account to = cards.get(randIndex.nextInt(cards.size()));
if (cannotWithdraw(from, to)) {
continue;
}
Withdraw.createWithdrawNew(randomFarm, from, to, getMultiplicityIdAndInc(from, to));
}
}

}
return accounts;
}

// Transfer to self is not allowed
private boolean cannotTransfer(Account from, Account to) {
return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta
|| from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate()
|| from.equals(to) || from.getAvailableOutDegree() == 0 || to.getAvailableInDegree() == 0;
}

private boolean cannotWithdraw(Account from, Account to) {
return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta
|| from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate()
|| from.equals(to);
}

private long getMultiplicityIdAndInc(Account from, Account to) {
String key = from.getAccountId() + "-" + to.getAccountId();
AtomicLong atomicInt = multiplicityMap.computeIfAbsent(key, k -> new AtomicLong());
return atomicInt.getAndIncrement();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class ActivityGenerator()(implicit spark: SparkSession)
def withdrawEvent(accountRDD: RDD[Account]): RDD[Withdraw] = {
val withdrawEvent = new WithdrawEvent

val cards = accountRDD.filter(_.getType == "debit card").collect()
val cards = spark.sparkContext.broadcast(accountRDD.filter(_.getType == "debit card").collect().toList)
accountRDD
.filter(_.getType != "debit card")
.sample(
Expand All @@ -178,7 +178,7 @@ class ActivityGenerator()(implicit spark: SparkSession)
withdrawEvent
.withdraw(
sources.toList.asJava,
cards.toList.asJava,
cards.value.asJava,
index
)
.iterator()
Expand Down

0 comments on commit 4c95a33

Please sign in to comment.