From 4c95a33786013a1555ce8f89f32f2fa38819dd67 Mon Sep 17 00:00:00 2001 From: qishipengqsp Date: Mon, 16 Sep 2024 14:39:11 +0800 Subject: [PATCH] wip --- .../datagen/entities/edges/Transfer.java | 35 +++++ .../datagen/entities/edges/Withdraw.java | 15 +++ .../events/AccountActivitiesEvent.java | 125 ++++++++++++++++++ .../generators/ActivityGenerator.scala | 4 +- 4 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 src/main/java/ldbc/finbench/datagen/generation/events/AccountActivitiesEvent.java diff --git a/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java b/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java index e112d682..6a045bda 100644 --- a/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java +++ b/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java @@ -32,6 +32,41 @@ public Transfer(Account fromAccount, Account toAccount, double amount, long crea this.isExplicitlyDeleted = isExplicitlyDeleted; } + public static void createTransferNew(RandomGeneratorFarm farm, Account from, Account to, + long multiplicityId) { + long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate()); + long creationDate = + Dictionaries.dates.randomAccountToAccountDate(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_DATE), from, to, + deleteDate); + boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted(); + double amount = + farm.get(RandomGeneratorFarm.Aspect.TRANSFER_AMOUNT).nextDouble() * DatagenParams.transferMaxAmount; + Transfer transfer = new Transfer(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete); + + // Set ordernum + String ordernum = Dictionaries.numbers.generateOrdernum(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_ORDERNUM)); + transfer.setOrdernum(ordernum); + + // Set comment + String comment = + Dictionaries.randomTexts.getUniformDistRandomTextForComments( + farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT)); + transfer.setComment(comment); + + // Set payType + String paytype = + Dictionaries.transferTypes.getUniformDistRandomText(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_PAYTYPE)); + transfer.setPayType(paytype); + + // Set goodsType + String goodsType = + Dictionaries.transferTypes.getUniformDistRandomText( + farm.get(RandomGeneratorFarm.Aspect.TRANSFER_GOODSTYPE)); + transfer.setGoodsType(goodsType); + + from.getTransferOuts().add(transfer); + } + public static Transfer createTransfer(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) { long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate()); diff --git a/src/main/java/ldbc/finbench/datagen/entities/edges/Withdraw.java b/src/main/java/ldbc/finbench/datagen/entities/edges/Withdraw.java index 7a3c6473..1d070e29 100644 --- a/src/main/java/ldbc/finbench/datagen/entities/edges/Withdraw.java +++ b/src/main/java/ldbc/finbench/datagen/entities/edges/Withdraw.java @@ -30,6 +30,21 @@ public Withdraw(Account fromAccount, Account toAccount, double amount, long crea this.comment = comment; } + public static void createWithdrawNew(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) { + Random dateRand = farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE); + long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate()); + long creationDate = Dictionaries.dates.randomAccountToAccountDate(dateRand, from, to, deleteDate); + boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted(); + double amount = + farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_AMOUNT).nextDouble() * DatagenParams.withdrawMaxAmount; + String comment = + Dictionaries.randomTexts.getUniformDistRandomTextForComments( + farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT)); + Withdraw withdraw = + new Withdraw(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete, comment); + from.getWithdraws().add(withdraw); + } + public static Withdraw createWithdraw(RandomGeneratorFarm farm, Account from, Account to, long multiplicityId) { Random dateRand = farm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE); long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate()); diff --git a/src/main/java/ldbc/finbench/datagen/generation/events/AccountActivitiesEvent.java b/src/main/java/ldbc/finbench/datagen/generation/events/AccountActivitiesEvent.java new file mode 100644 index 00000000..0e780052 --- /dev/null +++ b/src/main/java/ldbc/finbench/datagen/generation/events/AccountActivitiesEvent.java @@ -0,0 +1,125 @@ +package ldbc.finbench.datagen.generation.events; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import ldbc.finbench.datagen.entities.edges.Transfer; +import ldbc.finbench.datagen.entities.edges.Withdraw; +import ldbc.finbench.datagen.entities.nodes.Account; +import ldbc.finbench.datagen.generation.DatagenParams; +import ldbc.finbench.datagen.generation.distribution.DegreeDistribution; +import ldbc.finbench.datagen.util.RandomGeneratorFarm; + +public class AccountActivitiesEvent implements Serializable { + private final RandomGeneratorFarm randomFarm; + private final DegreeDistribution multiplicityDist; + private final double partRatio; + private final Random randIndex; + private final Map multiplicityMap; + + public AccountActivitiesEvent() { + this.partRatio = 1.0 / DatagenParams.transferShuffleTimes; + randomFarm = new RandomGeneratorFarm(); + multiplicityDist = DatagenParams.getTransferMultiplicityDistribution(); + multiplicityDist.initialize(); + randIndex = new Random(DatagenParams.defaultSeed); + multiplicityMap = new ConcurrentHashMap<>(); + } + + private void resetState(int seed) { + randomFarm.resetRandomGenerators(seed); + multiplicityDist.reset(seed); + randIndex.setSeed(seed); + } + + private LinkedList getIndexList(int size) { + LinkedList indexList = new LinkedList<>(); + for (int i = 0; i < size; i++) { + indexList.add(i); + } + return indexList; + } + + // Generation to parts will mess up the average degree(make it bigger than expected) caused by ceiling operations. + // Also, it will mess up the long tail range of powerlaw distribution of degrees caused by 1 rounded to 2. + // See the plot drawn by check_transfer.py for more details. + public List accountActivities(List accounts, List cards, int blockId) { + resetState(blockId); + Random pickAccountForWithdrawal = randomFarm.get(RandomGeneratorFarm.Aspect.ACCOUNT_WHETHER_WITHDRAW); + + + // scale to percentage + for (Account account : accounts) { + account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio)); + account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio)); + } + LinkedList availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds + for (int i = 0; i < accounts.size(); i++) { + Account from = accounts.get(i); + // account transfer to other accounts + while (from.getAvailableOutDegree() != 0) { + int skippedCount = 0; + for (int j = 0; j < availableToAccountIds.size(); j++) { + int toIndex = availableToAccountIds.get(j); + Account to = accounts.get(toIndex); + if (toIndex == i || cannotTransfer(from, to)) { + skippedCount++; + continue; + } + long numTransfers = Math.min(multiplicityDist.nextDegree(), + Math.min(from.getAvailableOutDegree(), to.getAvailableInDegree())); + for (int mindex = 0; mindex < numTransfers; mindex++) { + Transfer.createTransferNew(randomFarm, from, to, mindex); + } + if (to.getAvailableInDegree() == 0) { + availableToAccountIds.remove(j); + j--; + } + if (from.getAvailableOutDegree() == 0) { + break; + } + } + if (skippedCount == availableToAccountIds.size()) { + System.out.println("[Transfer] All accounts skipped for " + from.getAccountId()); + break; // end loop if all accounts are skipped + } + } + // account withdraw to cards + if (pickAccountForWithdrawal.nextDouble() < DatagenParams.accountWithdrawFraction) { + for (int count = 0; count < DatagenParams.maxWithdrawals; count++) { + Account to = cards.get(randIndex.nextInt(cards.size())); + if (cannotWithdraw(from, to)) { + continue; + } + Withdraw.createWithdrawNew(randomFarm, from, to, getMultiplicityIdAndInc(from, to)); + } + } + + } + return accounts; + } + + // Transfer to self is not allowed + private boolean cannotTransfer(Account from, Account to) { + return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta + || from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate() + || from.equals(to) || from.getAvailableOutDegree() == 0 || to.getAvailableInDegree() == 0; + } + + private boolean cannotWithdraw(Account from, Account to) { + return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta + || from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate() + || from.equals(to); + } + + private long getMultiplicityIdAndInc(Account from, Account to) { + String key = from.getAccountId() + "-" + to.getAccountId(); + AtomicLong atomicInt = multiplicityMap.computeIfAbsent(key, k -> new AtomicLong()); + return atomicInt.getAndIncrement(); + } + +} diff --git a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala index f8fcc73b..bbfcf065 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala @@ -166,7 +166,7 @@ class ActivityGenerator()(implicit spark: SparkSession) def withdrawEvent(accountRDD: RDD[Account]): RDD[Withdraw] = { val withdrawEvent = new WithdrawEvent - val cards = accountRDD.filter(_.getType == "debit card").collect() + val cards = spark.sparkContext.broadcast(accountRDD.filter(_.getType == "debit card").collect().toList) accountRDD .filter(_.getType != "debit card") .sample( @@ -178,7 +178,7 @@ class ActivityGenerator()(implicit spark: SparkSession) withdrawEvent .withdraw( sources.toList.asJava, - cards.toList.asJava, + cards.value.asJava, index ) .iterator()