From 6983de2e5e6c7fdfe689bc797e8d6b62468bc1d1 Mon Sep 17 00:00:00 2001 From: francois-a Date: Sat, 6 Apr 2024 21:24:10 +0000 Subject: [PATCH] Added parquet compatibility --- rnaseq/src/combine_GCTs.py | 65 +++++++++++++++------------- rnaseq/src/process_star_junctions.py | 2 +- 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/rnaseq/src/combine_GCTs.py b/rnaseq/src/combine_GCTs.py index c98b6066..97c94b69 100755 --- a/rnaseq/src/combine_GCTs.py +++ b/rnaseq/src/combine_GCTs.py @@ -5,48 +5,55 @@ import argparse import gzip import os +import qtl.io parser = argparse.ArgumentParser(description='Combine GCT files') parser.add_argument('input_files', nargs='+', help='List of GCT files, or file containing paths to GCTs.') -parser.add_argument('output_prefix', help='Prefix for output file: .gct.gz') +parser.add_argument('prefix', help='Prefix for output file: .gct.gz') parser.add_argument('--parquet', action='store_true', help='Write to parquet format instead of GCT') parser.add_argument('-o', '--output_dir', default='.', help='Output directory') args = parser.parse_args() -if len(args.input_files)==1 and '.gct' not in args.input_files[0]: +if len(args.input_files) == 1 and '.gct' not in args.input_files[0]: with open(args.input_files[0]) as f: paths = f.read().strip().split('\n') else: paths = args.input_files -sample_ids = np.array([os.path.split(i)[1].split('.')[0] for i in paths]) -assert len(sample_ids)==len(np.unique(sample_ids)) -# sort by sample_id -i = np.argsort(sample_ids) -sample_ids = sample_ids[i] -paths = np.array(paths)[i] +path_dict = {os.path.split(i)[1].split('.')[0]:i for i in paths} +sample_ids = sorted(path_dict.keys()) +assert len(sample_ids) == len(paths) -df = pd.read_csv(paths[0], sep='\t', skiprows=3, header=None, index_col=0, names=['Name','Description', sample_ids[0]]) -if df[sample_ids[0]].dtype==np.float64: - dtype = np.float32 -elif df[sample_ids[0]].dtype==np.int64: - dtype = np.int32 -else: - dtype = df[sample_ids[0]].dtype.type - -gct_df = pd.DataFrame(0, index=df.index, columns=['Description']+list(sample_ids), dtype=dtype) -gct_df['Description'] = df['Description'] -gct_df[sample_ids[0]] = df[sample_ids[0]].astype(dtype) -for k,(i,p) in enumerate(zip(sample_ids[1:], paths[1:])): - print('\rProcessing {}/{}'.format(k+2, len(paths)), end='', flush=True) - df = pd.read_csv(p, sep='\t', skiprows=3, header=None, usecols=[0,2], index_col=0, names=['Name',i], dtype={'Name':str, i:dtype}) - gct_df[i] = df[i] -print() +# detect format +if all([i.endswith('.parquet') for i in paths]): + sample_id = sample_ids[0] + df = pd.read_parquet(path_dict[sample_id]) + gct_df = pd.DataFrame(0, index=df.index, columns=['Description']+list(sample_ids), dtype=df[sample_id].dtype) + gct_df['Description'] = df['Description'] + gct_df[sample_id] = df[sample_id] + for k, sample_id in enumerate(sample_ids[1:], 2): + print(f"\rProcessing {k}/{len(sample_ids)}", end='' if k < len(sample_ids) else None, flush=True) + df = pd.read_parquet(path_dict[sample_id], columns=[sample_id]) + gct_df[sample_id] = df[sample_id] +else: # .gct or .gct.gz + sample_id = sample_ids[0] + df = pd.read_csv(path_dict[sample_id], sep='\t', skiprows=3, header=None, index_col=0, names=['Name', 'Description', sample_id]) + if df[sample_id].dtype == np.float64: + dtype = np.float32 + elif df[sample_id].dtype == np.int64: + dtype = np.int32 + else: + dtype = df[sample_id].dtype.type + gct_df = pd.DataFrame(0, index=df.index, columns=['Description']+list(sample_ids), dtype=dtype) + gct_df['Description'] = df['Description'] + gct_df[sample_id] = df[sample_id].astype(dtype) + for k, sample_id in enumerate(sample_ids[1:], 2): + print(f"\rProcessing {k}/{len(sample_ids)}", end='' if k < len(sample_ids) else None, flush=True) + df = pd.read_csv(path_dict[sample_id], sep='\t', skiprows=3, header=None, usecols=[0,2], + index_col=0, names=['Name', sample_id], dtype={'Name':str, sample_id:dtype}) + gct_df[sample_id] = df[sample_id] if args.parquet: - gct_df.to_parquet(os.path.join(args.output_dir, args.output_prefix+'.parquet')) + gct_df.to_parquet(os.path.join(args.output_dir, f"{args.prefix}.gct.parquet")) else: - with gzip.open(os.path.join(args.output_dir, args.output_prefix+'.gct.gz'), 'wt', compresslevel=6) as f: - f.write('#1.2\n') - f.write('{0:d}\t{1:d}\n'.format(gct_df.shape[0], gct_df.shape[1]-1)) - gct_df.to_csv(f, sep='\t', float_format='%.6g') + qtl.io.write_gct(gct_df, os.path.join(args.output_dir, f"{args.prefix}.gct.gz"), float_format='%.6g', compresslevel=6) diff --git a/rnaseq/src/process_star_junctions.py b/rnaseq/src/process_star_junctions.py index 2a9269bf..f79a7da4 100644 --- a/rnaseq/src/process_star_junctions.py +++ b/rnaseq/src/process_star_junctions.py @@ -50,7 +50,7 @@ # use unique-mapping reads gct_df = reference_df[['gene_id']].join(junctions_df['n_unique']) -gct_df['n_unique'] = gct_df['n_unique'].fillna(0).astype(int) +gct_df['n_unique'] = gct_df['n_unique'].fillna(0).astype(np.int32) # write as GCT gct_df.rename(columns={'gene_id':'Description', 'n_unique':args.prefix}, inplace=True) gct_df.index.name = 'Name'