-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfasta_reader.py
135 lines (113 loc) · 4.5 KB
/
fasta_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from diskhash import StructHash
__author__ = 'glazek'
class IndexedFastaReader(object):
def __init__(self, fasta_file, index_file=None, use_mmap=False):
"""
Reader of fasta file indexed with diskhash index.
:param fasta_file: path to fasta file
:param index_file: path to index file (diskhash index). If None, uses (fasta_file + '.dhi')
"""
if index_file is None:
index_file = (fasta_file[:-len('.xz')] if fasta_file.endswith('.xz') else fasta_file) + '.dhi'
self.index = StructHash(index_file, 0, '2l', 'r')
if fasta_file.endswith('.xz'):
if use_mmap:
raise ValueError("use_mmap cannot be used with xz files")
try:
import xz
except ImportError:
raise ImportError("Module xz not found.\n"
"The standard Python lzma module does not support random access.\n\n"
"It can be installed with `pip install python-xz`\n")
self.fasta = xz.open(fasta_file, 'rb')
else:
self.fasta = open(fasta_file, 'rb')
self.use_mmap = use_mmap
if use_mmap:
import mmap
self.data = mmap.mmap(self.fasta.fileno(), 0, access=mmap.ACCESS_READ)
def __del__(self):
self.fasta.close()
def lookup(self, sequence_id):
"""
Lookup coordinates in the index file.
:param sequence_id: sequence identifier
:return: tuple: start position, length (in bases) and a boolean flag
indicating if the sequence includes newline characters.
"""
try:
s, lm = self.index.lookup(sequence_id)
return s, lm >> 1, bool(lm%2)
except TypeError:
return None
def get(self, sequence_id):
"""
Get the sequence corresponding to given sequence identifier.
Note: if you only need the length of the sequence, use get_length method instead.
:param sequence_id: sequence identifier
:return: sequence corresponding to given identifier or None if identifier not found
"""
coordinates = self.lookup(sequence_id=sequence_id)
if not coordinates:
return None
start, length, multiline = coordinates
if self.use_mmap:
if not multiline:
return self.data[start:start+length]
end = self.data.find(b'\n>', start)
if end == -1:
seqdata = self.data[start:]
else:
seqdata = self.data[start:end]
seqdata = seqdata.replace(b'\n', b'')
assert len(seqdata) == length, "Retrieved length should match stored length"
return seqdata
self.fasta.seek(start)
if multiline:
sequence = []
while True:
line = self.fasta.readline().strip()
if not line or line.startswith(b'>'):
break
sequence.append(line)
return b''.join(sequence)
return self.fasta.read(length)
def get_length(self, sequence_id):
"""
Get the length of the sequence corresponding to given identifier.
:param sequence_id: sequence identifier
:return: sequence length (in bases) or None if identifier not found
"""
coordinates = self.lookup(sequence_id)
if not coordinates:
return None
return coordinates[1]
def parse_args():
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('fasta_file', metavar='FASTA',
help='fasta file')
parser.add_argument('--index', required=False,
help='path to the index file (by default: fasta_file + .dhi',
dest='index_file')
parser.add_argument('query', metavar='QUERY',
nargs='*',
help='Name(s) of sequence(s) to retrieve')
parser.add_argument('-F',
dest='query_file')
return parser.parse_args()
def main():
from itertools import chain
args = parse_args()
ix = IndexedFastaReader(args.fasta_file, args.index_file)
qs = [args.query]
if args.query_file:
qs.append(open(args.query_file))
for q in chain.from_iterable(qs):
q = q.strip()
s = ix.get(q)
if s:
print(">"+q)
print(s.decode('ascii'))
if __name__ == '__main__':
main()