-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchunktwelve.py
64 lines (49 loc) · 2.14 KB
/
chunktwelve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
import sys
import os
#Take in user input for algorithm parameters
if len(sys.argv) != 4:
print("Incorrect script call, ensure call is formatted as: python3 chunketc.py <input> <output directory> <chunksize>")
sys.exit(1)
#Read in name of the file and the desired size of chunks in nucleotides
input_file = sys.argv[1]
output_directory = sys.argv[2]
chunk_size = int(sys.argv[3])
#Use SeqIO.parse instead of read for multifasta file IMPORTANT
records = list(SeqIO.parse(input_file, "fasta"))
def create_batch(records, chunk_size):
record_it = iter(records)
record = next(record_it)
current_base = 0
batch = []
batch_size = 0
# While there are new records, keep creating new batches.
while record:
# Loop over records untill the batch is full. (or no new records)
while batch_size != chunk_size and record:
end = current_base + chunk_size - batch_size
seq = record[current_base:end]
end_of_slice = current_base + len(seq) - 1
#Use record.id for pure name, record.description for contig details
fasta_header = record.description + ":{}-{}".format(current_base, end_of_slice)
seq.id = seq.name = fasta_header
seq.description = ''
batch.append(seq)
current_base += len(seq)
batch_size += len(seq)
# Current record is exhausted, get a new one.
if current_base >= len(record):
record = next(record_it, None)
current_base = 0
# We have a batch with the correct size
yield batch
batch = []
batch_size = 0
#Ensure output directory exists
os.makedirs(output_directory, exist_ok = True)
#Range over batches and export each chunk as a fasta file
for i, batch in enumerate(create_batch(records, chunk_size)):
#Add input_file to prevent file overwrite from chunk loading
filename = os.path.join(output_directory,(input_file+"chunk{}.fasta").format(i))
SeqIO.write(batch, filename, "fasta")