You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
#!/usr/bin/env python############################################################# Import Necessary Modules ############################## source: https://github.com/KrisChristensen/PrivateAllele/tree/mainimportargparse# provides options at the command lineimportsys# take command line arguments and uses it in the scriptimportgzip# allows gzipped files to be readimportre# allows regular expressions to be usedfromtqdmimporttqdm############################################################# Command-line Arguments ###############################parser=argparse.ArgumentParser(
description="A script to identify SNPs that have alleles private to a population."
)
parser.add_argument(
"-vcf", help="The location of the vcf file", default=sys.stdin, required=True
)
parser.add_argument(
"-pop",
help="The location of the population file (IndividualName<tab>IndivdualPopulation per line)",
default=sys.stdin,
required=True,
)
parser.add_argument(
"-min",
help="The minimum number of individuals with the private allele in a population, default=1",
default=1,
)
parser.add_argument(
"-nsnp",
help="The number of SNPs in the vcf file",
default=1,
type=int,
required=True,
)
args=parser.parse_args()
############################################################Variables ############################################classVariables:
population= {}
populations= []
numIndividuals=0############################################################ Body of script ######################################classOpenFile:
def__init__(self, f, typ, occ):
"""Opens a file (gzipped) accepted"""ifre.search(".gz$", f):
self.filename=gzip.open(f, "rb")
else:
self.filename=open(f, "r")
iftyp=="vcf":
sys.stderr.write(f"\nOpened vcf file: {occ}\n")
OpenVcf(self.filename, occ)
eliftyp=="pop":
sys.stderr.write(f"\nOpened pop file: {occ}\n")
OpenPop(self.filename, occ)
classOpenVcf:
def__init__(self, f, o):
"""Reads a vcf file to identify private alleles"""self.numMarkers=0self.privateMarkers= {}
self.individuals= []
print("chr\tpos\tpop\tratio\tallele\toccured_in_N_ind\tN_ind\tOccured_in_N_pop")
forself.lineintqdm(f, desc="Processing VCF file", total=args.nsnp):
### Allows gzipped files to be read ###try:
self.line=self.line.decode("utf-8")
except:
passifnotre.search("^#", self.line):
(
self.chr,
self.pos,
self.id,
self.ref,
self.alt,
self.qual,
self.filt,
self.info,
self.fmt,
) =self.line.split()[0:9]
self.individualGenotypes=self.line.split()[9:]
self.numMarkers+=1self.privateTest= {}
self.privateTest["0"] = {}
self.privateTest["1"] = {}
ind_index= (
[]
) # collect the index of the individuals that are in the population file and the vcf fileforself.position, self.indGenoinenumerate(self.individualGenotypes):
self.indName=self.individuals[self.position]
try:
# check if the individual is in the population file# if not, skip the individual# this is useful, as we can have different number of individuals in the vcf and population filesself.indPop=Variables.population[self.indName]
ind_index.append(self.position)
except:
continueself.indGeno=re.split(r"[/|]", self.indGeno.split(":")[0])
# check which population have 0 and 1, and store the info in a dictionary# by checking the length of the dictionary, we can know if the SNP is private to a populationifself.indGeno[0] =="."orself.indGeno[1] ==".":
continueifint(self.indGeno[0]) ==0orint(self.indGeno[1]) ==0:
ifself.indPopinself.privateTest["0"]:
self.privateTest["0"][self.indPop] +=1else:
self.privateTest["0"][self.indPop] =1ifint(self.indGeno[0]) ==1orint(self.indGeno[1]) ==1:
ifself.indPopinself.privateTest["1"]:
self.privateTest["1"][self.indPop] +=1else:
self.privateTest["1"][self.indPop] =1# check if the SNP is all missing in all but one populationall_missing_but_one= (
len(self.privateTest["0"]) +len(self.privateTest["1"]) ==1
)
# calculate the SNP is non-missing in how many populationdata_from_nPop=len(
set(self.privateTest["0"].keys())
|set(self.privateTest["1"].keys())
)
iflen(self.privateTest["0"].keys()) ==1andnotall_missing_but_one:
forself.popinself.privateTest["0"]:
# count the ratio of "0" in the self.individualGenotypes[the_index]# Here missing data is not countedthe_index_of_this_pop= [
the_index2forthe_index2inind_indexifVariables.population[self.individuals[the_index2]]
==self.popandnot"."inself.individualGenotypes[the_index2].split(":")[0]
]
number_of_0="".join(
[
self.individualGenotypes[the_index].split(":")[0]
forthe_indexinthe_index_of_this_popifnot"."inself.individualGenotypes[the_index].split(":")[0]
]
).count("0")
number_of_ind_non_missing=len(the_index_of_this_pop)
ratio=round(number_of_0/ (len(the_index_of_this_pop) *2), 3)
ifint(self.privateTest["0"][self.pop]) >=int(args.min):
print(
f"{self.chr}\t{self.pos}\t{self.pop}\t{ratio}\t0\t{self.privateTest['0'][self.pop]}\t{number_of_ind_non_missing}\t{data_from_nPop}"
)
eliflen(self.privateTest["1"].keys()) ==1andnotall_missing_but_one:
forself.popinself.privateTest["1"]:
# count the ratio of "1" in the self.individualGenotypes[the_index]# Here missing data is not countedthe_index_of_this_pop= [
the_index2forthe_index2inind_indexifVariables.population[self.individuals[the_index2]]
==self.popandnot"."inself.individualGenotypes[the_index2].split(":")[0]
]
number_of_1="".join(
[
self.individualGenotypes[the_index].split(":")[0]
forthe_indexinthe_index_of_this_popifnot"."inself.individualGenotypes[the_index].split(":")[0]
]
).count("1")
number_of_ind_non_missing=len(the_index_of_this_pop)
ratio=round(number_of_1/ (len(the_index_of_this_pop) *2), 3)
ifint(self.privateTest["1"][self.pop]) >=int(args.min):
print(
f"{self.chr}\t{self.pos}\t{self.pop}\t{ratio}\t1\t{self.privateTest['1'][self.pop]}\t{number_of_ind_non_missing}\t{data_from_nPop}"
)
elifre.search("^#CHROM", self.line):
self.individuals=self.line.split()[9:]
self.numInds=len(self.individuals)
# if int(self.numInds) != int(Variables.numIndividuals):# sys.stderr.write(# "Warning, population and vcf files have different number of individuals.\n"# )f.close()
classOpenPop:
def__init__(self, f, o):
"""Reads a population file to identify which individual goes to which population"""self.pops= {} # record the number of individuals in each populationforself.lineinf:
### Allows gzipped files to be read ###try:
self.line=self.line.decode("utf-8")
except:
passifnotre.search("^#", self.line):
self.individual, self.pop=self.line.split()
ifself.individualinVariables.population:
sys.stderr.write(
f"\tWarning: {self.individual} already defined for population {Variables.population[self.individual]}, replacing with population {self.pop}\n\n"
)
Variables.population[self.individual] =self.popifself.popinself.pops:
self.pops[self.pop] +=1else:
self.pops[self.pop] =1Variables.numIndividuals+=1forself.popinsorted(self.pops):
Variables.populations.append(self.pop)
sys.stderr.write(
"\tIdentified population {} with {} samples\n".format(
self.pop, self.pops[self.pop]
)
)
f.close()
### Order of script ####if__name__=="__main__":
Variables() # initialize variables as global variablesopen_aln=OpenFile(args.pop, "pop", args.pop)
open_aln=OpenFile(args.vcf, "vcf", args.vcf)
The text was updated successfully, but these errors were encountered:
The text was updated successfully, but these errors were encountered: