Skip to content

Commit

Permalink
change format and truncate type (#85)
Browse files Browse the repository at this point in the history
* add table and parallel curation

* solve hard path encoder

* solve python path encoder

* fix path bug

* change output format

* add truncate type

---------

Co-authored-by: hujiatao <[email protected]>
  • Loading branch information
hujiatao0 and hujiatao authored Jul 16, 2024
1 parent f1827c9 commit 45d52db
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 27 deletions.
121 changes: 94 additions & 27 deletions scripts/paramgen/parameter_curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
THRESH_HOLD_6 = 0
TRUNCATION_LIMIT = 10000
BATCH_SIZE = 5000
TRUNCATION_ORDER = "TIMESTAMP_ASCENDING"
TIME_TRUNCATE = True

table_dir = '../../out/factor_table'
out_dir = '../../out/substitute_parameters/'
Expand Down Expand Up @@ -133,7 +135,7 @@ def filter_neighbors(account_list, amount_bucket_df, num_list, account_id):
sum_num = 0
header_at_limit = -1
for col in reversed(num_list):
sum_num += rows_amount_bucket[str(col)]
sum_num += rows_amount_bucket[col]
if sum_num >= TRUNCATION_LIMIT:
header_at_limit = col
break
Expand Down Expand Up @@ -166,7 +168,7 @@ def neighbors_with_truncate_threshold(transfer_in_amount, rows_account_list, row
sum_num = 0
header_at_limit = -1
for col in reversed(num_list):
sum_num += rows_amount_bucket[str(col)]
sum_num += rows_amount_bucket[col]
if sum_num >= TRUNCATION_LIMIT:
header_at_limit = col
break
Expand All @@ -184,7 +186,7 @@ def neighbors_with_trancate(rows_account_list, rows_amount_bucket, num_list):
sum_num = 0
header_at_limit = -1
for col in reversed(num_list):
sum_num += rows_amount_bucket[str(col)]
sum_num += rows_amount_bucket[col]
if sum_num >= TRUNCATION_LIMIT:
header_at_limit = col
break
Expand Down Expand Up @@ -219,7 +221,7 @@ def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df,

num_list = []
if query_id != 3 and query_id != 11:
num_list = [int(x) for x in amount_bucket_df.columns.tolist()]
num_list = [x for x in amount_bucket_df.columns.tolist()]

query_parallelism = max(1, multiprocessing.cpu_count() // 4)
# print(f'query_id {query_id} query_parallelism {query_parallelism}')
Expand All @@ -236,7 +238,7 @@ def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df,

def get_filter_neighbor_list(neighbors_df, amount_bucket_df):
first_column_name = neighbors_df.columns[0]
num_list = [int(x) for x in amount_bucket_df.columns.tolist()]
num_list = [x for x in amount_bucket_df.columns.tolist()]

query_parallelism = max(1, multiprocessing.cpu_count() // 4)
chunks = np.array_split(neighbors_df, query_parallelism)
Expand Down Expand Up @@ -268,8 +270,20 @@ def get_next_sum_table(neighbors_df, basic_sum_df, batch_size=BATCH_SIZE):
return result_df


def handleLoanParam(loan):
return str(loan)
def handleThresholdParam(threshold):
return str(threshold)

def handleThreshold2Param(threshold2):
return str(threshold2)

def hendleIdParam(id):
return str(id)

def handleTruncateLimitParam(truncateLimit):
return str(truncateLimit)

def handleTruncateOrderParam(truncateOrder):
return truncateOrder


def handleTimeDurationParam(timeParam):
Expand Down Expand Up @@ -301,31 +315,43 @@ def process_iter_queries(query_id):
first_account_path = os.path.join(table_dir, 'loan_account_list')
account_account_path = os.path.join(table_dir, 'trans_withdraw_items')
upstream_amount_path = os.path.join(table_dir, 'upstream_amount')
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_month')
else:
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_bucket')
time_bucket_path = os.path.join(table_dir, 'trans_withdraw_month')
output_path = os.path.join(out_dir, 'tcr8.txt')
steps = 2

elif query_id == 1:
first_account_path = os.path.join(table_dir, 'account_transfer_out_list')
account_account_path = os.path.join(table_dir, 'account_transfer_out_items')
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_month')
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_out_month')
output_path = os.path.join(out_dir, 'tcr1.txt')
steps = 2

elif query_id == 5:
first_account_path = os.path.join(table_dir, 'person_account_list')
account_account_path = os.path.join(table_dir, 'account_transfer_out_items')
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_month')
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_out_month')
output_path = out_dir
steps = 3

elif query_id == 2:
first_account_path = os.path.join(table_dir, 'person_account_list')
account_account_path = os.path.join(table_dir, 'account_transfer_in_items')
amount_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
if TIME_TRUNCATE:
amount_bucket_path = os.path.join(table_dir, 'transfer_in_month')
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_in_month')
output_path = os.path.join(out_dir, 'tcr2.txt')
steps = 3
Expand Down Expand Up @@ -394,35 +420,47 @@ def process_iter_queries(query_id):

final_first_items = search_params.generate(first_array, 0.01)
time_list = time_select.findTimeParams(final_first_items, next_time_bucket)
truncate_limit_list = [TRUNCATION_LIMIT] * len(final_first_items)
truncate_order_list = [TRUNCATION_ORDER] * len(final_first_items)
thresh_list = [THRESH_HOLD] * len(final_first_items)

if query_id == 8:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "loanId")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')

elif query_id == 1:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "account_id")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')

elif query_id == 5:
csvWriter_5 = CSVSerializer()
csvWriter_5.setOutputFile(output_path + 'tcr5.txt')
csvWriter_5.registerHandler(handleLoanParam, final_first_items, "personId")
csvWriter_5.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_5.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_5.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_5.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_12 = CSVSerializer()
csvWriter_12.setOutputFile(output_path + 'tcr12.txt')
csvWriter_12.registerHandler(handleLoanParam, final_first_items, "personId")
csvWriter_12.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_12.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_12.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_12.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_5.writeCSV()
csvWriter_12.writeCSV()
Expand All @@ -432,8 +470,10 @@ def process_iter_queries(query_id):
elif query_id == 2:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "personId")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')
Expand All @@ -455,15 +495,19 @@ def process_iter_queries(query_id):

csvWriter_3 = CSVSerializer()
csvWriter_3.setOutputFile(output_path + 'tcr3.txt')
csvWriter_3.registerHandler(handleLoanParam, final_first_items, "id1")
csvWriter_3.registerHandler(handleLoanParam, final_second_items_3, "id2")
csvWriter_3.registerHandler(hendleIdParam, final_first_items, "id1")
csvWriter_3.registerHandler(hendleIdParam, final_second_items_3, "id2")
csvWriter_3.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_3.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_3.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_4 = CSVSerializer()
csvWriter_4.setOutputFile(output_path + 'tcr4.txt')
csvWriter_4.registerHandler(handleLoanParam, final_first_items, "id1")
csvWriter_4.registerHandler(handleLoanParam, final_second_items_4, "id2")
csvWriter_4.registerHandler(hendleIdParam, final_first_items, "id1")
csvWriter_4.registerHandler(hendleIdParam, final_second_items_4, "id2")
csvWriter_4.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_4.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_4.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_3.writeCSV()
csvWriter_4.writeCSV()
Expand All @@ -473,8 +517,10 @@ def process_iter_queries(query_id):
elif query_id == 11:
csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "id")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id {query_id} finished')
Expand Down Expand Up @@ -502,17 +548,27 @@ def process_1_hop_query(query_id):
final_first_items = search_params.generate(first_array, 0.01)
time_list = time_select.findTimeParams(final_first_items, time_bucket_df)

truncate_limit_list = [TRUNCATION_LIMIT] * len(final_first_items)
truncate_order_list = [TRUNCATION_ORDER] * len(final_first_items)
thresh_list = [THRESH_HOLD] * len(final_first_items)

if query_id == 7:

csvWriter_7 = CSVSerializer()
csvWriter_7.setOutputFile(output_path + 'tcr7.txt')
csvWriter_7.registerHandler(handleLoanParam, final_first_items, "account_id")
csvWriter_7.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_7.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter_7.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_7.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_7.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_9 = CSVSerializer()
csvWriter_9.setOutputFile(output_path + 'tcr9.txt')
csvWriter_9.registerHandler(handleLoanParam, final_first_items, "account_id")
csvWriter_9.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_9.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter_9.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_9.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_9.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_7.writeCSV()
csvWriter_9.writeCSV()
Expand All @@ -531,8 +587,8 @@ def process_1_hop_query(query_id):

csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "pid1")
csvWriter.registerHandler(handleLoanParam, final_second_items, "pid2")
csvWriter.registerHandler(hendleIdParam, final_first_items, "pid1")
csvWriter.registerHandler(hendleIdParam, final_second_items, "pid2")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.writeCSV()

Expand All @@ -543,7 +599,10 @@ def process_withdraw_query():

first_account_path = os.path.join(table_dir, 'account_withdraw_in_items')
time_bucket_path = os.path.join(table_dir, 'transfer_in_month')
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_bucket')
if TIME_TRUNCATE:
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_month')
else:
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_bucket')
transfer_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
output_path = os.path.join(out_dir, 'tcr6.txt')

Expand Down Expand Up @@ -576,10 +635,18 @@ def process_withdraw_query():
final_first_items = search_params.generate(first_array, 0.01)
time_list = time_select.findTimeParams(final_first_items, next_time_bucket)

truncate_limit_list = [TRUNCATION_LIMIT] * len(final_first_items)
truncate_order_list = [TRUNCATION_ORDER] * len(final_first_items)
thresh_list = [THRESH_HOLD_6] * len(final_first_items)

csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path)
csvWriter.registerHandler(handleLoanParam, final_first_items, "id")
csvWriter.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter.registerHandler(handleThresholdParam, thresh_list, "threshold1")
csvWriter.registerHandler(handleThreshold2Param, thresh_list, "threshold2")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")
csvWriter.writeCSV()

print(f'query_id 6 finished')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ object FactorGenerationStage extends DatagenStage {
.save(s"${args.outputDir}/factor_table/account_in_out_list")

val transferInTimeRDD = transferRDD.select(col("toId"), col("createTime"))
val withdrawInTimeRDD = withdrawRDD.select(col("toId"), col("createTime"))
val personGuaranteeTimeRDD = personGuaranteeRDD.select(col("fromId"), col("createTime"))
val InvestInTimeRDD = personInvestRDD.select(col("investorId"), col("createTime"))
val transferOutTimeRDD = transferRDD.select(col("fromId"), col("createTime"))
Expand All @@ -302,6 +303,15 @@ object FactorGenerationStage extends DatagenStage {
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save(s"${args.outputDir}/factor_table/transfer_in_month")

val withdrawInPivotRDD = processByMonth(withdrawInTimeRDD, "toId", "createTime", "account_id")

withdrawInPivotRDD
.write
.option("header", "true")
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save(s"${args.outputDir}/factor_table/withdraw_in_month")

val personGuaranteePivotRDD = processByMonth(personGuaranteeTimeRDD, "fromId", "createTime", "person_id")

personGuaranteePivotRDD
Expand Down

0 comments on commit 45d52db

Please sign in to comment.