Skip to content

Commit

Permalink
add table and parallel curation (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
hujiatao0 authored Jun 19, 2024
1 parent 0b7ecb5 commit bc2a408
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 45 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ tune.log
sf*/
sf*.tar
sf*.tar.gz

scripts/paramgen/__pycache__/
88 changes: 58 additions & 30 deletions scripts/paramgen/parameter_curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import codecs
from datetime import date
from glob import glob
import concurrent.futures

THRESH_HOLD = 0
TRUNCATION_LIMIT = 10000
Expand Down Expand Up @@ -95,24 +96,51 @@ def find_neighbors(account_list, account_account_df, account_amount_df, amount_b
return list(result)


# def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df, amount_bucket_df):
# next_neighbors_df = neighbors_df
# num_list = [int(x) for x in amount_bucket_df.iloc[0].index.tolist()[1:]]
# next_neighbors_df['account_list'] = next_neighbors_df['account_list'].apply(lambda x: find_neighbors(x, account_account_df, account_amount_df, amount_bucket_df, num_list))
# return next_neighbors_df


def process_chunk(chunk, account_account_df, account_amount_df, amount_bucket_df, num_list):
chunk['account_list'] = chunk['account_list'].apply(
lambda x: find_neighbors(x, account_account_df, account_amount_df, amount_bucket_df, num_list)
)
return chunk


def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df, amount_bucket_df):
next_neighbors_df = neighbors_df
num_list = [int(x) for x in amount_bucket_df.iloc[0].index.tolist()[1:]]
next_neighbors_df['account_list'] = next_neighbors_df['account_list'].apply(lambda x: find_neighbors(x, account_account_df, account_amount_df, amount_bucket_df, num_list))
num_list = [int(x) for x in amount_bucket_df.columns.tolist()]

chunks = np.array_split(neighbors_df, 8)

with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(process_chunk, chunk, account_account_df, account_amount_df, amount_bucket_df, num_list) for chunk in chunks]
results = [future.result() for future in concurrent.futures.as_completed(futures)]

next_neighbors_df = pd.concat(results)
next_neighbors_df = next_neighbors_df.sort_index()
return next_neighbors_df


def get_next_sum_table(neighbors_df, basic_sum_df):
result_data = []
for index, row in neighbors_df.iterrows():
loan_id = row['loan_id']
account_list = row['account_list']
add_frame = basic_sum_df.loc[basic_sum_df.index.isin(account_list)]
add_frame = add_frame.rename_axis('loan_id')
sum_result = add_frame.sum(axis=0).astype(int)
sum_result['loan_id'] = loan_id
result_data.append(sum_result.to_dict())
return pd.DataFrame(result_data)
# result_data = []
# for index, row in neighbors_df.iterrows():
# loan_id = row['loan_id']
# account_list = row['account_list']
# add_frame = basic_sum_df.loc[basic_sum_df.index.isin(account_list)]
# add_frame = add_frame.rename_axis('loan_id')
# sum_result = add_frame.sum(axis=0).astype(int)
# sum_result['loan_id'] = loan_id
# result_data.append(sum_result.to_dict())
# return pd.DataFrame(result_data)

neighbors_exploded = neighbors_df.explode('account_list')
merged_df = neighbors_exploded.merge(basic_sum_df, left_on='account_list', right_index=True, how='left').drop(columns=['account_list'])
result_df = merged_df.groupby('loan_id').sum().astype(int)

return result_df


def handleLoanParam(loan):
Expand All @@ -129,10 +157,10 @@ def handleTimeDurationParam(timeParam):
def main():

loan_account_path = '../../out/factor_table/loan_account_list'
account_account_path = '../../out/factor_table/account_items'
account_amount_path = '../../out/factor_table/amount'
amount_bucket_path = '../../out/factor_table/amount_bucket'
time_bucket_path = '../../out/factor_table/month'
account_account_path = '../../out/factor_table/trans_withdraw_items'
account_amount_path = '../../out/factor_table/upstream_amount'
amount_bucket_path = '../../out/factor_table/trans_withdraw_bucket'
time_bucket_path = '../../out/factor_table/trans_withdraw_month'
output_path = '../../out/substitute_parameters/'


Expand All @@ -151,38 +179,38 @@ def main():

steps = 3
current_step = 0
neighbors_df = loan_account_df
final_array = neighbors_df['loan_id'].to_numpy()
loan_neighbors_df = loan_account_df.sort_values(by='loan_id')
loan_array = loan_neighbors_df['loan_id'].to_numpy()
next_time_bucket = None

while current_step < steps:

next_amount_bucket = get_next_sum_table(neighbors_df, amount_bucket_df)
next_amount_bucket.set_index('loan_id', inplace=True)
result_array = next_amount_bucket.to_numpy().sum(axis=1)
final_array = np.column_stack((final_array, result_array))
next_loan_amount_bucket = get_next_sum_table(loan_neighbors_df, amount_bucket_df)
# next_loan_amount_bucket.set_index('loan_id', inplace=True)
temp_loan_array = next_loan_amount_bucket.to_numpy().sum(axis=1)
loan_array = np.column_stack((loan_array, temp_loan_array))

if current_step == steps - 1:
next_time_bucket = get_next_sum_table(neighbors_df, time_bucket_df)
next_time_bucket.set_index('loan_id', inplace=True)
next_time_bucket = get_next_sum_table(loan_neighbors_df, time_bucket_df)
# next_time_bucket.set_index('loan_id', inplace=True)

# print(neighbors_df)
# print(next_amount_bucket)
# print(next_time_bucket)
# print(final_array)
# print(loan_array)

else:
neighbors_df = get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df, amount_bucket_df)
loan_neighbors_df = get_next_neighbor_list(loan_neighbors_df, account_account_df, account_amount_df, amount_bucket_df)

current_step += 1


result = search_params.generate(final_array, 0.01)
time_list = time_select.findTimeParams(result, next_time_bucket)
final_loan = search_params.generate(loan_array, 0.01)
time_list = time_select.findTimeParams(final_loan, next_time_bucket)

csvWriter = CSVSerializer()
csvWriter.setOutputFile(output_path + "tcr8.txt")
csvWriter.registerHandler(handleLoanParam, result, "loanId")
csvWriter.registerHandler(handleLoanParam, final_loan, "loanId")
csvWriter.registerHandler(handleTimeDurationParam, time_list, "startDate|endDate")

csvWriter.writeCSV()
Expand Down
Loading

0 comments on commit bc2a408

Please sign in to comment.