Skip to content

Commit cb1f145

Browse files
authored
Merge pull request #1035 from quartiq/fls-utils
Fls-utils
2 parents 61f7df1 + c6271d9 commit cb1f145

File tree

8 files changed

+333
-13
lines changed

8 files changed

+333
-13
lines changed

py/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ dependencies = [
1818
"numpy",
1919
"scipy",
2020
"matplotlib",
21-
"miniconf-mqtt@git+https://github.com/quartiq/miniconf@v0.19.0#subdirectory=py/miniconf-mqtt",
21+
"tqdm",
22+
"miniconf-mqtt@git+https://github.com/quartiq/miniconf@miniconf-v0.20.0#subdirectory=py/miniconf-mqtt",
2223
]
2324

2425
[project.urls]

py/stabilizer/search.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""Stabilizer FLS signal searcher"""
2+
3+
# pylint: disable=logging-fstring-interpolation,too-many-statements,too-many-locals,duplicate-code
4+
5+
6+
import asyncio
7+
import argparse
8+
import logging
9+
10+
import numpy as np
11+
from tqdm.asyncio import tqdm
12+
13+
import miniconf
14+
from miniconf.common import MQTTv5, one
15+
from stabilizer.stream import Stream, get_local_ip
16+
17+
from . import stream as _
18+
19+
_logger = logging.getLogger(__name__)
20+
21+
22+
async def main():
23+
"""FLS Signal searcher"""
24+
parser = argparse.ArgumentParser(
25+
description="Search demodulation frequency range and identify "
26+
"highest power input frequency"
27+
)
28+
parser.add_argument(
29+
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
30+
)
31+
parser.add_argument(
32+
"--broker",
33+
"-b",
34+
default="mqtt",
35+
help="The MQTT broker address to use to use (%(default)s)",
36+
)
37+
parser.add_argument(
38+
"--prefix",
39+
"-p",
40+
default="dt/sinara/fls/+",
41+
help="The MQTT topic prefix (%(default)s)",
42+
)
43+
parser.add_argument(
44+
"--channel",
45+
"-c",
46+
type=int,
47+
choices=[0, 1],
48+
default=0,
49+
help="The channel to operate on",
50+
)
51+
parser.add_argument(
52+
"--port",
53+
type=int,
54+
default=9293,
55+
help="Local port to listen on for streaming data",
56+
)
57+
parser.add_argument(
58+
"--threshold",
59+
"-t",
60+
default=-15,
61+
type=float,
62+
help="log2 of tone power threshold",
63+
)
64+
65+
args = parser.parse_args()
66+
logging.basicConfig(
67+
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
68+
level=logging.WARN - 10 * args.verbose,
69+
)
70+
71+
async with miniconf.Client(
72+
args.broker,
73+
protocol=MQTTv5,
74+
logger=logging.getLogger("aiomqtt-client"),
75+
) as client:
76+
prefix, _alive = one(await miniconf.discover(client, args.prefix))
77+
78+
conf = miniconf.Miniconf(client, prefix)
79+
local_ip = get_local_ip(args.broker)
80+
_transport, stream = await Stream.open(args.port, local_ip)
81+
82+
f2w = (1 << 32) / 500e6
83+
f2s = 4 * 8 * 128 / 400e6
84+
f_demod = 0x200000 / f2w
85+
86+
async def measure(f, nframe=1):
87+
await conf.set(f"/ch/{args.channel}/input/freq", int(f * f2w))
88+
# settle
89+
# await asyncio.sleep(0.1)
90+
# discard pending frames
91+
while True:
92+
try:
93+
stream.queue.get_nowait()
94+
except asyncio.QueueEmpty:
95+
break
96+
# discard one more
97+
await stream.queue.get()
98+
demod = []
99+
for _ in range(nframe):
100+
frame = await stream.queue.get()
101+
demod.append(frame.demod()[:, args.channel])
102+
demod = np.concatenate(demod)
103+
p = np.log2(np.square(demod.astype(np.int64)).mean() / (1 << 62))
104+
iq = demod.astype(np.float64).ravel().view(np.complex128)
105+
df = np.angle(iq[1:] * iq[:-1].conj()).mean() / (2 * np.pi * f2s)
106+
return f + df, p
107+
108+
try:
109+
await conf.set("/stream", f"{local_ip}:{args.port}")
110+
fp = []
111+
with tqdm(np.arange(1e6, 240e6, 40e3)) as pbar:
112+
async for f in pbar:
113+
ff, p = await measure(f)
114+
fp.append((ff, p))
115+
if p > args.threshold:
116+
_logger.info(f"{ff:g} Hz: {p:g} 3dB")
117+
else:
118+
_logger.debug(f"{ff:g} Hz: {p:g} 3dB")
119+
if not fp:
120+
raise ValueError("no tones found above threshold")
121+
f0, _p0 = max(fp, key=lambda k: k[1])
122+
fu, pu = await measure(f0 + 2 * f_demod)
123+
_fl, pl = await measure(f0 - 2 * f_demod)
124+
if pu > pl:
125+
f0 = fu
126+
f0, p0 = await measure(f0, nframe=100)
127+
w = round(int(f0 * f2w))
128+
_logger.warning(
129+
f"final {f0:f} Hz, {p0:g} 3dB, ch/{args.channel}/input/freq={w:#x}"
130+
)
131+
await conf.set(f"/ch/{args.channel}/input/freq", w)
132+
finally:
133+
await conf.set("/stream", "0.0.0.0:0")
134+
135+
136+
if __name__ == "__main__":
137+
asyncio.run(main())

py/stabilizer/stream.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,29 @@ def to_si(self):
9999
)
100100

101101

102+
class Fls:
103+
"""FLS application stream format"""
104+
format_id = 2
105+
106+
def __init__(self, header, body):
107+
self.header = header
108+
self.body = body
109+
110+
def size(self):
111+
"""Return the data size of the frame in bytes"""
112+
return len(self.body)
113+
114+
def to_mu(self):
115+
"""Return the raw data in machine units"""
116+
data = np.frombuffer(self.body, "<i4")
117+
data = data.reshape(-1, 2, 7)
118+
return data
119+
120+
def demod(self):
121+
"""Return baseband signal"""
122+
return self.to_mu()[:, :, :2]
123+
124+
102125
class Frame:
103126
"""Stream frame constisting of a header and multiple data batches"""
104127

@@ -109,6 +132,7 @@ class Frame:
109132
parsers = {
110133
AdcDac.format_id: AdcDac,
111134
ThermostatEem.format_id: ThermostatEem,
135+
Fls.format_id: Fls,
112136
}
113137

114138
@classmethod

py/stabilizer/watch.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
"""Stabilizer FLS event watcher and recorder"""
2+
3+
# pylint: disable=logging-fstring-interpolation,too-many-statements
4+
5+
import asyncio
6+
import argparse
7+
import logging
8+
import json
9+
import time
10+
11+
import miniconf
12+
from miniconf.common import MQTTv5, one
13+
from stabilizer.stream import Frame, Stream, get_local_ip, wrap
14+
15+
from . import stream as _
16+
17+
_logger = logging.getLogger(__name__)
18+
19+
20+
async def main():
21+
"""FLS watcher"""
22+
parser = argparse.ArgumentParser(
23+
description="Watch telemetry messages for digital_input, blank, and slip "
24+
"events and dump buffered stream data around those triggers into files."
25+
)
26+
parser.add_argument(
27+
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
28+
)
29+
parser.add_argument(
30+
"--broker",
31+
"-b",
32+
default="mqtt",
33+
help="The MQTT broker address to use to use (%(default)s)",
34+
)
35+
parser.add_argument(
36+
"--prefix",
37+
"-p",
38+
default="dt/sinara/fls/+",
39+
help="The MQTT topic prefix (%(default)s)",
40+
)
41+
parser.add_argument(
42+
"--channel",
43+
"-c",
44+
type=int,
45+
choices=[0, 1],
46+
default=0,
47+
help="The channel to operate on",
48+
)
49+
parser.add_argument(
50+
"--port",
51+
type=int,
52+
default=9293,
53+
help="Local port to listen on for streaming data",
54+
)
55+
parser.add_argument(
56+
"--buffer",
57+
"-n",
58+
default=1 << 12,
59+
type=int,
60+
help="number of packets to buffer (%(default)s)",
61+
)
62+
parser.add_argument(
63+
"--dump", "-r", default="fls_event", help="file name prefix, (%(default)s)"
64+
)
65+
parser.add_argument(
66+
"--max",
67+
"-m",
68+
default=20,
69+
type=int,
70+
help="max number of telemetry messages with active trigger"
71+
" before exitin to limit the disk usage. (%(default)s)",
72+
)
73+
args = parser.parse_args()
74+
75+
logging.basicConfig(
76+
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
77+
level=logging.WARN - 10 * args.verbose,
78+
)
79+
80+
local_ip = get_local_ip(args.broker)
81+
82+
async def dump(ev):
83+
transport, stream = await Stream.open(args.port, local_ip, maxsize=args.buffer)
84+
try:
85+
while True:
86+
await ev.wait()
87+
name = f"{args.dump}_{int(time.time())}.raw"
88+
_logger.warning(f"trigger start {name}")
89+
count = 0
90+
with open(name, "wb") as f:
91+
while ev.is_set():
92+
fr = await stream.queue.get()
93+
f.write(Frame.header_fmt.pack(*fr.header))
94+
f.write(fr.body)
95+
count += 1
96+
_logger.warning(f"trigger end, written {count} frames")
97+
finally:
98+
transport.close()
99+
100+
async def watch(ev):
101+
async with miniconf.Client(
102+
args.broker,
103+
protocol=MQTTv5,
104+
logger=logging.getLogger("aiomqtt-client"),
105+
) as client:
106+
prefix, _alive = one(await miniconf.discover(client, args.prefix))
107+
conf = miniconf.Miniconf(client, prefix)
108+
await conf.set("/stream", f"{local_ip}:{args.port}")
109+
await conf.close()
110+
111+
topic = f"{prefix}/telemetry"
112+
await client.subscribe(topic)
113+
try:
114+
last = None
115+
n = 0
116+
async for tele in client.messages:
117+
if n > args.max:
118+
break
119+
tele = json.loads(tele.payload)
120+
if trigger(tele, last, args.channel):
121+
ev.set()
122+
n += 1
123+
else:
124+
ev.clear()
125+
last = tele
126+
finally:
127+
ev.clear()
128+
await client.unsubscribe(topic)
129+
conf = miniconf.Miniconf(client, prefix)
130+
await conf.set("/stream", "0.0.0.0:0")
131+
await conf.close()
132+
133+
def trigger(new, last, channel):
134+
if last is None:
135+
return False
136+
if channel is None:
137+
channels = [0, 1]
138+
else:
139+
channels = [channel]
140+
for ch in channels:
141+
if wrap(new["raw"][ch]["holds"] - last["raw"][ch]["holds"]) > 0:
142+
continue
143+
blanks = wrap(new["raw"][ch]["blanks"] - last["raw"][ch]["blanks"])
144+
if blanks > 0:
145+
_logger.info(f"trigger ch{ch} on {blanks} blanks")
146+
return True
147+
slips = wrap(new["raw"][ch]["slips"] - last["raw"][ch]["slips"])
148+
if slips > 0:
149+
_logger.info(f"trigger ch{ch} on {slips} slips")
150+
return True
151+
return False
152+
153+
ev = asyncio.Event()
154+
await asyncio.gather(dump(ev), watch(ev))
155+
156+
157+
if __name__ == "__main__":
158+
asyncio.run(main())

src/bin/dds.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ mod app {
254254
}
255255

256256
#[task(priority = 1, shared=[usb, settings], local=[usb_terminal])]
257-
async fn usb(mut c: usb::Context) {
257+
async fn usb(mut c: usb::Context) -> ! {
258258
loop {
259259
c.shared.usb.lock(|usb| {
260260
usb.poll(&mut [c
@@ -275,7 +275,7 @@ mod app {
275275
}
276276

277277
#[task(priority = 1, shared=[network])]
278-
async fn ethernet_link(mut c: ethernet_link::Context) {
278+
async fn ethernet_link(mut c: ethernet_link::Context) -> ! {
279279
loop {
280280
c.shared.network.lock(|net| net.processor.handle_link());
281281
Systick::delay(1.secs()).await;

src/bin/dual-iir.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ mod app {
497497
}
498498

499499
#[task(priority = 1, shared=[network, settings, telemetry], local=[cpu_temp_sensor])]
500-
async fn telemetry(mut c: telemetry::Context) {
500+
async fn telemetry(mut c: telemetry::Context) -> ! {
501501
loop {
502502
let telemetry =
503503
c.shared.telemetry.lock(|telemetry| telemetry.clone());
@@ -526,7 +526,7 @@ mod app {
526526
}
527527

528528
#[task(priority = 1, shared=[usb, settings], local=[usb_terminal])]
529-
async fn usb(mut c: usb::Context) {
529+
async fn usb(mut c: usb::Context) -> ! {
530530
loop {
531531
// Handle the USB serial terminal.
532532
c.shared.usb.lock(|usb| {
@@ -548,7 +548,7 @@ mod app {
548548
}
549549

550550
#[task(priority = 1, shared=[network])]
551-
async fn ethernet_link(mut c: ethernet_link::Context) {
551+
async fn ethernet_link(mut c: ethernet_link::Context) -> ! {
552552
loop {
553553
c.shared.network.lock(|net| net.processor.handle_link());
554554
Systick::delay(1.secs()).await;

0 commit comments

Comments
 (0)