-
Notifications
You must be signed in to change notification settings - Fork 1
/
mapper.py
129 lines (100 loc) · 3.54 KB
/
mapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class Sequence:
def __init__(self, lines):
self.name = lines[0].strip()[1:]
self.bases = "".join([x.strip() for x in lines[1:]]).upper()
def __str__(self):
return self.name + ": " + self.bases[:20] + "..."
def __repr__(self):
return self.__str__()
class Read(Sequence):
def get_seed(self, seedlength):
return self.bases[:seedlength]
def replace_kmers(self, replacements):
pass
class Reference(Sequence):
def __init__(self, lines):
self.kmers = None
super().__init__(lines)
def calculate_kmers(self, kmersize):
self.kmers = {}
for pos in range(0, len(self.bases) - kmersize + 1):
kmer = self.bases[pos:(pos + kmersize)]
if kmer not in self.kmers:
self.kmers[kmer] = []
self.kmers[kmer] += [pos]
def get_kmer_positions(self, kmer):
if self.kmers is None or len(next(iter(self.kmers))) != len(kmer):
self.calculate_kmers(len(kmer))
if kmer not in self.kmers:
return []
return self.kmers[kmer]
def count_mismatches(self, read, position):
mismatches = 0
for pos in range(position, position+len(read.bases)):
if pos >= len(self.bases):
break
if read.bases[pos-position] != self.bases[pos]:
mismatches += 1
# Count every base of the read that goes out past the end of the reference as a mismatch
mismatches += position+len(read.bases)-pos-1
return mismatches
class Mapping:
def __init__(self, reference):
self.reference = reference
self.reads = {}
def add_read(self, read, position):
if position not in self.reads:
self.reads[position] = []
self.reads[position] += [read]
def get_reads_at_position(self, position):
if position not in self.reads:
return []
return self.reads[position]
def __str__(self):
res = ["Mapping to " + self.reference.name]
for pos in self.reads:
res += [" " + str(len(self.reads[pos])) + " reads mapping at " + str(pos)]
return "\n".join(res)
class SAMWriter:
def __init__(self, mapping):
pass
def write_mapping(self, filename):
pass
class ReadPolisher:
def __init__(self, kmerlen):
pass
def add_read(self, readseq):
pass
def get_replacements(self, minfreq):
pass
def read_fasta(fastafile, klassname):
klass = globals()[klassname]
f = open(fastafile, "r")
readlines = []
reads = []
for line in f:
if line[0] == '>' and len(readlines) != 0:
reads += [klass(readlines)]
readlines = []
readlines += [line]
reads += [klass(readlines)]
f.close()
return reads
def map_reads(reads, reference, kmersize, max_mismatches):
mapping = Mapping(reference)
reference.calculate_kmers(kmersize)
for read in reads:
seed = read.get_seed(kmersize)
seed_positions = reference.get_kmer_positions(seed)
for position in seed_positions:
mismatches = reference.count_mismatches(read, position)
if mismatches < max_mismatches:
mapping.add_read(read, position)
return mapping
def main():
reads = read_fasta("data/fluA_reads.fasta", Read.__name__)
reference = read_fasta("data/fluA.fasta", Reference.__name__)[0]
mapping = map_reads(reads, reference, 8, 5)
print("Mapping reads: ", len(mapping.reads))
if __name__ == "__main__":
main()