Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change format and truncate type #85

Merged
merged 9 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 94 additions & 27 deletions scripts/paramgen/parameter_curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
THRESH_HOLD_6 = 0
TRUNCATION_LIMIT = 10000
BATCH_SIZE = 5000
TRUNCATION_ORDER = "TIMESTAMP_ASCENDING"
TIME_TRUNCATE = True

table_dir = '../../out/factor_table'
out_dir = '../../out/substitute_parameters/'
Expand Down Expand Up @@ -133,7 +135,7 @@ def filter_neighbors(account_list, amount_bucket_df, num_list, account_id):
sum_num = 0
header_at_limit = -1
for col in reversed(num_list):
sum_num += rows_amount_bucket[str(col)]
sum_num += rows_amount_bucket[col]
if sum_num >= TRUNCATION_LIMIT:
header_at_limit = col
break
Expand Down Expand Up @@ -166,7 +168,7 @@ def neighbors_with_truncate_threshold(transfer_in_amount, rows_account_list, row
sum_num = 0
header_at_limit = -1
for col in reversed(num_list):
sum_num += rows_amount_bucket[str(col)]
sum_num += rows_amount_bucket[col]
if sum_num >= TRUNCATION_LIMIT:
header_at_limit = col
break
Expand All @@ -184,7 +186,7 @@ def neighbors_with_trancate(rows_account_list, rows_amount_bucket, num_list):
sum_num = 0
header_at_limit = -1
for col in reversed(num_list):
sum_num += rows_amount_bucket[str(col)]
sum_num += rows_amount_bucket[col]
if sum_num >= TRUNCATION_LIMIT:
header_at_limit = col
break
Expand Down Expand Up @@ -219,7 +221,7 @@ def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df,

num_list = []
if query_id != 3 and query_id != 11:
num_list = [int(x) for x in amount_bucket_df.columns.tolist()]
num_list = [x for x in amount_bucket_df.columns.tolist()]

query_parallelism = max(1, multiprocessing.cpu_count() // 4)
# print(f'query_id {query_id} query_parallelism {query_parallelism}')
Expand All @@ -236,7 +238,7 @@ def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df,

def get_filter_neighbor_list(neighbors_df, amount_bucket_df):
first_column_name = neighbors_df.columns[0]
num_list = [int(x) for x in amount_bucket_df.columns.tolist()]
num_list = [x for x in amount_bucket_df.columns.tolist()]

query_parallelism = max(1, multiprocessing.cpu_count() // 4)
chunks = np.array_split(neighbors_df, query_parallelism)
Expand Down Expand Up @@ -268,8 +270,20 @@ def get_next_sum_table(neighbors_df, basic_sum_df, batch_size=BATCH_SIZE):
return result_df


def handleLoanParam(loan):
return str(loan)
def handleThresholdParam(threshold):
return str(threshold)

def handleThreshold2Param(threshold2):
return str(threshold2)

def hendleIdParam(id):
return str(id)

def handleTruncateLimitParam(truncateLimit):
return str(truncateLimit)

def handleTruncateOrderParam(truncateOrder):
return truncateOrder


def handleTimeDurationParam(timeParam):
Expand Down Expand Up @@ -301,31 +315,43 @@ def process_iter_queries(query_id):
first_account_path = os.path.join(table_dir, 'loan_account_list')
account_account_path = os.path.join(table_dir, 'trans_withdraw_items')
upstream_amount_path = os.path.join(table_dir, 'upstream_amount')
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_month')
else:
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_bucket')
time_bucket_path = os.path.join(table_dir, 'trans_withdraw_month')
output_path = os.path.join(out_dir, 'tcr8.txt')
steps = 2

elif query_id == 1:
first_account_path = os.path.join(table_dir, 'account_transfer_out_list')
account_account_path = os.path.join(table_dir, 'account_transfer_out_items')
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_month')
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_out_month')
output_path = os.path.join(out_dir, 'tcr1.txt')
steps = 2

elif query_id == 5:
first_account_path = os.path.join(table_dir, 'person_account_list')
account_account_path = os.path.join(table_dir, 'account_transfer_out_items')
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_month')
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_out_month')
output_path = out_dir
steps = 3

elif query_id == 2:
first_account_path = os.path.join(table_dir, 'person_account_list')
account_account_path = os.path.join(table_dir, 'account_transfer_in_items')
amount_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'transfer_in_month')
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_in_month')
output_path = os.path.join(out_dir, 'tcr2.txt')
steps = 3
Expand Down Expand Up @@ -394,35 +420,47 @@ def process_iter_queries(query_id):

final_first_items = search_params.generate(first_array, 0.01)
time_list = time_select.findTimeParams(final_first_items, next_time_bucket)
truncate_limit_list = [TRUNCATION_LIMIT] * len(final_first_items)
truncate_order_list = [TRUNCATION_ORDER] * len(final_first_items)
thresh_list = [THRESH_HOLD] * len(final_first_items)

if query_id == 8:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "loanId")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')

elif query_id == 1:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "account_id")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')

elif query_id == 5:
csvWriter_5 = CSVSerializer()
csvWriter_5.setOutputFile(output_path + 'tcr5.txt')
csvWriter_5.registerHandler(handleLoanParam, final_first_items, "personId")
csvWriter_5.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_5.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_5.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_5.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_12 = CSVSerializer()
csvWriter_12.setOutputFile(output_path + 'tcr12.txt')
csvWriter_12.registerHandler(handleLoanParam, final_first_items, "personId")
csvWriter_12.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_12.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_12.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_12.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_5.writeCSV()
csvWriter_12.writeCSV()
Expand All @@ -432,8 +470,10 @@ def process_iter_queries(query_id):
elif query_id == 2:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "personId")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')
Expand All @@ -455,15 +495,19 @@ def process_iter_queries(query_id):

csvWriter_3 = CSVSerializer()
csvWriter_3.setOutputFile(output_path + 'tcr3.txt')
csvWriter_3.registerHandler(handleLoanParam, final_first_items, "id1")
csvWriter_3.registerHandler(handleLoanParam, final_second_items_3, "id2")
csvWriter_3.registerHandler(hendleIdParam, final_first_items, "id1")
csvWriter_3.registerHandler(hendleIdParam, final_second_items_3, "id2")
csvWriter_3.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_3.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_3.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_4 = CSVSerializer()
csvWriter_4.setOutputFile(output_path + 'tcr4.txt')
csvWriter_4.registerHandler(handleLoanParam, final_first_items, "id1")
csvWriter_4.registerHandler(handleLoanParam, final_second_items_4, "id2")
csvWriter_4.registerHandler(hendleIdParam, final_first_items, "id1")
csvWriter_4.registerHandler(hendleIdParam, final_second_items_4, "id2")
csvWriter_4.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_4.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_4.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_3.writeCSV()
csvWriter_4.writeCSV()
Expand All @@ -473,8 +517,10 @@ def process_iter_queries(query_id):
elif query_id == 11:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "id")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')
Expand Down Expand Up @@ -502,17 +548,27 @@ def process_1_hop_query(query_id):
final_first_items = search_params.generate(first_array, 0.01)
time_list = time_select.findTimeParams(final_first_items, time_bucket_df)

truncate_limit_list = [TRUNCATION_LIMIT] * len(final_first_items)
truncate_order_list = [TRUNCATION_ORDER] * len(final_first_items)
thresh_list = [THRESH_HOLD] * len(final_first_items)

if query_id == 7:

csvWriter_7 = CSVSerializer()
csvWriter_7.setOutputFile(output_path + 'tcr7.txt')
csvWriter_7.registerHandler(handleLoanParam, final_first_items, "account_id")
csvWriter_7.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_7.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter_7.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_7.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_7.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_9 = CSVSerializer()
csvWriter_9.setOutputFile(output_path + 'tcr9.txt')
csvWriter_9.registerHandler(handleLoanParam, final_first_items, "account_id")
csvWriter_9.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_9.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter_9.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_9.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_9.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_7.writeCSV()
csvWriter_9.writeCSV()
Expand All @@ -531,8 +587,8 @@ def process_1_hop_query(query_id):

csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "pid1")
csvWriter.registerHandler(handleLoanParam, final_second_items, "pid2")
csvWriter.registerHandler(hendleIdParam, final_first_items, "pid1")
csvWriter.registerHandler(hendleIdParam, final_second_items, "pid2")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.writeCSV()

Expand All @@ -543,7 +599,10 @@ def process_withdraw_query():

first_account_path = os.path.join(table_dir, 'account_withdraw_in_items')
time_bucket_path = os.path.join(table_dir, 'transfer_in_month')
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_bucket')
if TIME_TRUNCATE:
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_month')
else:
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_bucket')
transfer_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
output_path = os.path.join(out_dir, 'tcr6.txt')

Expand Down Expand Up @@ -576,10 +635,18 @@ def process_withdraw_query():
final_first_items = search_params.generate(first_array, 0.01)
time_list = time_select.findTimeParams(final_first_items, next_time_bucket)

truncate_limit_list = [TRUNCATION_LIMIT] * len(final_first_items)
truncate_order_list = [TRUNCATION_ORDER] * len(final_first_items)
thresh_list = [THRESH_HOLD_6] * len(final_first_items)

csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "id")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleThresholdParam, thresh_list, "threshold1")
csvWriter.registerHandler(handleThreshold2Param, thresh_list, "threshold2")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id 6 finished')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ object FactorGenerationStage extends DatagenStage {
.save(s"${args.outputDir}/factor_table/account_in_out_list")

val transferInTimeRDD = transferRDD.select(col("toId"), col("createTime"))
val withdrawInTimeRDD = withdrawRDD.select(col("toId"), col("createTime"))
val personGuaranteeTimeRDD = personGuaranteeRDD.select(col("fromId"), col("createTime"))
val InvestInTimeRDD = personInvestRDD.select(col("investorId"), col("createTime"))
val transferOutTimeRDD = transferRDD.select(col("fromId"), col("createTime"))
Expand All @@ -302,6 +303,15 @@ object FactorGenerationStage extends DatagenStage {
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save(s"${args.outputDir}/factor_table/transfer_in_month")

val withdrawInPivotRDD = processByMonth(withdrawInTimeRDD, "toId", "createTime", "account_id")

withdrawInPivotRDD
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save(s"${args.outputDir}/factor_table/withdraw_in_month")

val personGuaranteePivotRDD = processByMonth(personGuaranteeTimeRDD, "fromId", "createTime", "person_id")

personGuaranteePivotRDD
Expand Down
Loading