-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpeakdet.py
More file actions
133 lines (111 loc) · 4.48 KB
/
peakdet.py
File metadata and controls
133 lines (111 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import numpy as np
import matplotlib.pyplot as plt
def peakdet(y, delta):
"""
Return the indices (indices_min, indices_max) of local maxima and minima
("peaks") in the 1-d array y.
A point is considered a maximum if it locally has the maximal value, and is
preceded and followed by a value lower by delta.
Based on http://billauer.co.il/peakdet.html.
"""
if delta < 0:
raise ValueError("delta must be non-negative")
# try starting with search for max peaks or min peaks first
# take whichever returns more peaks
# todo: could be optimized to not completely go through the array twice
mins1, maxes1, _ = _peakdet(y, delta, lookformax=True)
mins2, maxes2, _ = _peakdet(y, delta, lookformax=False)
if len(mins1) + len(maxes1) > len(mins2) + len(maxes2):
return mins1, maxes1
else:
return mins2, maxes2
def peakdet_wrapped(y, delta):
"""
Return the indices (indices_min, indices_max) of local maxima and minima
("peaks") in the 1-d array y. y is interpreted to wrap around the ends. For
exámple a distribution over angles.
A point is considered a maximum if it locally has the maximal value, and is
preceded and followed by a value lower by delta.
Based on http://billauer.co.il/peakdet.html.
"""
if delta < 0:
raise ValueError("delta must be non-negative")
mins1, maxes1, state = _peakdet(y, delta, lookformax=True)
# start again from the beginning with the previous state so that peaks wrapped around
# the ends are now also detected
mins2, maxes2, state = _peakdet(y, delta, lookformax=True, state=state)
return mins2, maxes2
def _peakdet(y, delta, lookformax, state=None):
"""
Returns (indices_min, indices_max, state). Starts with looking for max when
lookformax == True, else min. Suppresses first extremum.
"""
indices_min = []
indices_max = []
if state:
mn, mx, mnpos, mxpos, lookformax, first_peak = state
else:
mn, mx = float("inf"), -float("inf")
mnpos, mxpos = None, None
first_peak = True
for i, this in enumerate(y):
# keep track of the current max and min
if this > mx:
mx = this
mxpos = i
if this < mn:
mn = this
mnpos = i
if lookformax and this < mx - delta:
# we have fallen delta below the current max, this is now a legitimate max,
# start looking for a min now
if not first_peak:
indices_max.append(mxpos)
mnpos, mn = i, this
lookformax = False
first_peak = False
elif not lookformax and this > mn + delta:
# we have risen delta above the current min, this is now a legitimate min,
# start looking for a max now
if not first_peak:
indices_min.append(mnpos)
mxpos, mx = i, this
lookformax = True
first_peak = False
state = (mn, mx, mnpos, mxpos, lookformax, first_peak)
return indices_min, indices_max, state
def get_testdata(mus):
xs = np.linspace(0, 360, 1000)
sigma = 50
ys = np.zeros_like(xs)
for mu in mus:
for k in [-1, 0, 1]:
ys += 100*np.exp(-(xs-mu + 360*k)**2/sigma**2)
return xs, ys
def check_peakdet(fun, mus, mins_should, maxes_should):
xs, ys = get_testdata(mus)
mins, maxes = fun(ys, delta=50)
np.testing.assert_allclose(sorted(xs[mins]) if len(mins) else [], sorted(mins_should), atol=1)
np.testing.assert_allclose(sorted(xs[maxes]) if len(maxes) else [], sorted(maxes_should), atol=1)
def test_normal():
check_peakdet(peakdet, [20], [200], [])
check_peakdet(peakdet, [100], [], [100])
check_peakdet(peakdet, [50, 250], [150], [50, 250])
check_peakdet(peakdet, [], [], [])
check_peakdet(peakdet, [100, 180], [], [110])
def test_wrapped():
check_peakdet(peakdet_wrapped, [20], [200], [20])
check_peakdet(peakdet_wrapped, [100], [280], [100])
check_peakdet(peakdet_wrapped, [50, 250], [150, 330], [50, 250])
check_peakdet(peakdet_wrapped, [], [], [])
check_peakdet(peakdet_wrapped, [100, 180], [320], [110])
if __name__ == "__main__":
xs, ys = get_testdata([100, 180])
ys += np.random.normal(size=ys.shape)
mins, maxes = peakdet_wrapped(ys, delta=50)
plt.plot(xs, ys)
if len(maxes):
plt.plot(xs[maxes], ys[maxes], "rx")
if len(mins):
plt.plot(xs[mins], ys[mins], "bx")
plt.show()