-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPooledVariant.py
executable file
·114 lines (110 loc) · 3.82 KB
/
PooledVariant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#=========================================================================
# This is OPEN SOURCE SOFTWARE governed by the Gnu General Public
# License (GPL) version 3, as described at www.opensource.org.
# Copyright (C)2016 William H. Majoros ([email protected]).
#=========================================================================
from __future__ import (absolute_import, division, print_function,
unicode_literals, generators, nested_scopes, with_statement)
from builtins import (bytes, dict, int, list, object, range, str, ascii,
chr, hex, input, next, oct, open, pow, round, super, filter, map, zip)
from Pool import Pool
from Replicate import Replicate
import copy
#=========================================================================
# Attributes:
# ID : string
# pools : array of Pool
# Instance Methods:
# PooledVariant(ID)
# addPool(Pool)
# int numPools()
# int getMaxDnaReps()
# int getMaxRnaReps()
# int[] getDnaReps()
# int[] getRnaReps()
# int[] getFreqs()
# newVar=collapse()
#
# var.forceEqualFreqs()
# bool dropHomozygousPools() # False if all pools were dropped
# int numHomozygousPools()
# string print()
# Class Methods:
#
#=========================================================================
class PooledVariant:
"""PooledVariant"""
def __init__(self,ID):
self.ID=ID
self.pools=[]
def addPool(self,pool):
self.pools.append(pool)
def numPools(self):
return len(self.pools)
def getFreqs(self):
freqs=[pool.freq for pool in self.pools]
return freqs
def getDnaReps(self):
return [len(pool.DNA) for pool in self.pools]
def getRnaReps(self):
return [len(pool.RNA) for pool in self.pools]
def getMaxDnaReps(self):
return max(self.getDnaReps())
def getMaxRnaReps(self):
return max(self.getRnaReps())
def getAveFreq(self): ### Assumes equal pool sizes!
freqs=[pool.freq for pool in self.pools]
ave=sum(freqs)/len(freqs)
return ave
def collapseReps(self,reps,into):
for rep in reps:
into.ref+=rep.ref
into.alt+=rep.alt
def forceEqualFreqs(self,p,q):
for pool in self.pools:
pool.changeFreqAndResample(p,q)
def collapse(self):
newVar=PooledVariant(self.ID)
aveFreq=self.getAveFreq()
newPool=Pool(1,aveFreq)
dna=Replicate(0,0); rna=Replicate(0,0)
for pool in self.pools:
self.collapseReps(pool.DNA,dna)
self.collapseReps(pool.RNA,rna)
newPool.DNA.append(dna); newPool.RNA.append(rna)
newVar.pools.append(newPool)
return newVar
def duplicateFirstPool(self):
n=len(self.pools)
first=self.pools[0]
self.pools=[]
for i in range(n):
new=copy.deepcopy(first)
self.pools.append(new)
new.index=i
def collapseReplicates(self):
for pool in self.pools: pool.collapseReplicates()
def numHomozygousPools(self):
n=0
for pool in self.pools:
if(not pool.isHetPool()): n+=1
return n
def dropHomozygousPools(self):
newPools=[]
for pool in self.pools:
#if(pool.hasHetDnaRep()): newPools.append(pool)
if(pool.isHetPool()): newPools.append(pool)
self.pools=newPools
return len(newPools)>0
def print(self):
text="(variant (id "+self.ID+")\n"
for pool in self.pools:
text+="\t(pool "+str(pool.index)+"\n"
text+="\t\t(freq "+str(pool.freq)+")\n"
for rep in pool.DNA:
text+="\t\t(DNA (ref "+str(rep.ref)+") (alt "+str(rep.alt)+"))\n"
for rep in pool.RNA:
text+="\t\t(RNA (ref "+str(rep.ref)+") (alt "+str(rep.alt)+"))\n"
text+="\t)\n"
text+=")"
return text