Files
zephyr_dsp/plotter.py

133 lines
3.9 KiB
Python

"""
Live serial plotter for audio_analysis firmware.
Reads prefixed lines from UART:
RAW:s0,s1,s2,... → time-domain waveform (decimated by 4)
FFT:m0,m1,m2,... → frequency-domain spectrum (first 128 bins)
Usage:
python plotter.py COM5 # Windows
python plotter.py /dev/ttyACM0 # Linux
Requirements:
pip install pyserial matplotlib numpy PyQt5
"""
import sys
import threading
import matplotlib
matplotlib.use("Qt5Agg")
import serial
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# ── Config ──────────────────────────────────────────
BAUD = 115200
SAMPLE_RATE = 44100
BUF_SIZE = 1024
DECIMATION = 4
DISPLAY_SAMPLES = BUF_SIZE // DECIMATION # 256
DISPLAY_BINS = 128
# ── Serial setup ────────────────────────────────────
if len(sys.argv) < 2:
print(f"Usage: python {sys.argv[0]} <COM_PORT>")
sys.exit(1)
port = sys.argv[1]
ser = serial.Serial(port, BAUD, timeout=0.1)
# ── Plot setup ──────────────────────────────────────
fig, (ax_time, ax_freq) = plt.subplots(2, 1, figsize=(10, 6))
fig.suptitle("Audio Analysis — Live")
# Time domain (decimated)
time_x = np.arange(DISPLAY_SAMPLES) * DECIMATION / SAMPLE_RATE * 1000 # ms
time_line, = ax_time.plot(time_x, np.zeros(DISPLAY_SAMPLES), color="cyan")
ax_time.set_xlim(0, time_x[-1])
ax_time.set_ylim(-1.1, 1.1)
ax_time.set_xlabel("Time (ms)")
ax_time.set_ylabel("Amplitude")
ax_time.set_title("Waveform")
ax_time.grid(True, alpha=0.3)
# Frequency domain (first DISPLAY_BINS bins)
freq_x = np.arange(DISPLAY_BINS) * SAMPLE_RATE / BUF_SIZE # Hz
freq_line, = ax_freq.plot(freq_x, np.zeros(DISPLAY_BINS), color="orange")
ax_freq.set_xlim(0, freq_x[-1])
ax_freq.set_ylim(0, 1)
ax_freq.set_xlabel("Frequency (Hz)")
ax_freq.set_ylabel("Magnitude")
ax_freq.set_title("Spectrum")
ax_freq.grid(True, alpha=0.3)
plt.tight_layout()
# ── Thread-safe data storage ───────────────────────
lock = threading.Lock()
raw_data = np.zeros(DISPLAY_SAMPLES)
fft_data = np.zeros(DISPLAY_BINS)
# ── Background serial reader thread ───────────────
def serial_reader():
global raw_data, fft_data
while True:
try:
raw_line = ser.readline()
if not raw_line:
continue
line = raw_line.decode("utf-8", errors="ignore").strip()
except Exception:
continue
if line.startswith("RAW:"):
try:
values = line[4:].split(",")
samples = np.array([int(v) for v in values if v],
dtype=np.float32)
normalized = (samples - 2048.0) / 2048.0
with lock:
raw_data = normalized[:DISPLAY_SAMPLES]
except (ValueError, IndexError):
pass
elif line.startswith("FFT:"):
try:
values = line[4:].split(",")
mags = np.array([float(v) for v in values if v])
with lock:
fft_data = mags[:DISPLAY_BINS]
except (ValueError, IndexError):
pass
reader_thread = threading.Thread(target=serial_reader, daemon=True)
reader_thread.start()
# ── Animation update ────────────────────────────────
def update(frame):
with lock:
rd = raw_data.copy()
fd = fft_data.copy()
time_line.set_ydata(rd)
freq_line.set_ydata(fd)
peak = np.max(fd)
if peak > 0:
ax_freq.set_ylim(0, peak * 1.2)
return time_line, freq_line
ani = FuncAnimation(fig, update, interval=100, blit=False, cache_frame_data=False)
plt.show()
ser.close()