From 5b8ef4a0ecec11600a8b5026e208801427d188b5 Mon Sep 17 00:00:00 2001 From: Sangmin Lee Date: Sun, 5 Apr 2026 23:03:09 +0900 Subject: [PATCH 1/4] add speaker listen feature --- Makefile | 3 + cmd/tacit/main.go | 24 +++- pkg/capture/source.go | 13 ++ pkg/capture/speaker_darwin.go | 108 ++++++++++++++ pkg/capture/speaker_darwin.h | 19 +++ pkg/capture/speaker_darwin.m | 241 ++++++++++++++++++++++++++++++++ pkg/capture/speaker_e2e_test.go | 67 +++++++++ pkg/capture/speaker_stub.go | 22 +++ pkg/config/config.go | 11 +- pkg/pipeline/pipeline.go | 132 ++++++++++------- 10 files changed, 589 insertions(+), 51 deletions(-) create mode 100644 pkg/capture/source.go create mode 100644 pkg/capture/speaker_darwin.go create mode 100644 pkg/capture/speaker_darwin.h create mode 100644 pkg/capture/speaker_darwin.m create mode 100644 pkg/capture/speaker_e2e_test.go create mode 100644 pkg/capture/speaker_stub.go diff --git a/Makefile b/Makefile index cf26d92..5bc1d04 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,9 @@ $(WHISPER_BUILD)/src/libwhisper.a: e2e-test: build ./tacit process testdata/test_voice_recording.m4a go test -tags integration -v -count=1 ./pkg/process/ -run TestClassifier +ifeq ($(UNAME_S),Darwin) + go test -tags "integration darwin" -v -count=1 -timeout 30s ./pkg/capture/ -run TestSpeaker_Stream_E2E +endif INSTALL_DIR := $(HOME)/.local/bin diff --git a/cmd/tacit/main.go b/cmd/tacit/main.go index 37ae6e9..b894b62 100644 --- a/cmd/tacit/main.go +++ b/cmd/tacit/main.go @@ -16,6 +16,7 @@ import ( "syscall" "time" + "github.com/sangmin7648/tacit/pkg/capture" "github.com/sangmin7648/tacit/pkg/config" "github.com/sangmin7648/tacit/pkg/daemon" "github.com/sangmin7648/tacit/pkg/pipeline" @@ -342,6 +343,27 @@ func cmdListen(cfg *config.Config) { } defer p.Close() + // Build audio sources. + mic, err := capture.New() + if err != nil { + log.Fatalf("Failed to init microphone: %v", err) + } + defer mic.Close() + + sources := []capture.AudioSource{mic} + + if cfg.CaptureSpeaker { + spk, err := capture.NewSpeaker() + if err != nil { + log.Printf("Warning: system audio capture unavailable: %v", err) + log.Printf("Continuing with microphone only.") + } else { + defer spk.Close() + sources = append(sources, spk) + log.Printf("System audio capture enabled (requires Screen Recording permission)") + } + } + // Setup signal handling for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) sigCh := make(chan os.Signal, 1) @@ -357,7 +379,7 @@ func cmdListen(cfg *config.Config) { log.Printf("Knowledge base: %s", config.BaseDir()) log.Printf("Press Ctrl+C to stop") - if err := p.Run(ctx); err != nil { + if err := p.Run(ctx, sources); err != nil { log.Printf("Pipeline error: %v", err) } log.Printf("tacit daemon stopped") diff --git a/pkg/capture/source.go b/pkg/capture/source.go new file mode 100644 index 0000000..c3aceb3 --- /dev/null +++ b/pkg/capture/source.go @@ -0,0 +1,13 @@ +package capture + +import "context" + +// AudioSource provides a stream of 16kHz mono int16 PCM audio samples. +// Both Mic and Speaker implement this interface. +type AudioSource interface { + // Stream starts capturing and returns a channel of sample chunks. + // The channel is closed when ctx is cancelled. + Stream(ctx context.Context) (<-chan []int16, error) + // Close releases all resources. + Close() +} diff --git a/pkg/capture/speaker_darwin.go b/pkg/capture/speaker_darwin.go new file mode 100644 index 0000000..e9b183b --- /dev/null +++ b/pkg/capture/speaker_darwin.go @@ -0,0 +1,108 @@ +//go:build darwin + +package capture + +/* +#cgo CFLAGS: -x objective-c -fobjc-arc +#cgo LDFLAGS: -framework ScreenCaptureKit -framework CoreMedia -framework AudioToolbox + +#include "speaker_darwin.h" +#include +*/ +import "C" + +import ( + "context" + "fmt" + "log" + "runtime/cgo" + "sync/atomic" + "unsafe" +) + +// Speaker captures system audio output (all apps) using ScreenCaptureKit. +// Requires macOS 13.0+ and Screen Recording permission. +// The captured audio is NOT affected by the microphone mute state. +type Speaker struct { + cap *C.SpeakerCapture +} + +// NewSpeaker creates a new system-audio capture instance and starts the +// ScreenCaptureKit stream. Returns an error if macOS < 13, permission is +// denied, or the stream cannot be initialised. +func NewSpeaker() (*Speaker, error) { + // We create a temporary placeholder handle; the real handle is set in + // Stream() once we have a channel to deliver samples into. + // speaker_create is called lazily in Stream so we can pass the channel handle. + return &Speaker{}, nil +} + +// Stream starts system-audio capture and returns a channel of int16 chunks. +// The channel is closed when ctx is cancelled. +func (s *Speaker) Stream(ctx context.Context) (<-chan []int16, error) { + ch := make(chan []int16, 128) + var stopped atomic.Bool + + // Register the channel in the cgo handle registry. + handle := cgo.NewHandle(ch) + + var errCStr *C.char + cap := C.speaker_create(C.uintptr_t(handle), &errCStr) + if errCStr != nil { + msg := C.GoString(errCStr) + C.free(unsafe.Pointer(errCStr)) + handle.Delete() + close(ch) + return nil, fmt.Errorf("speaker capture: %s", msg) + } + if cap == nil { + handle.Delete() + close(ch) + return nil, fmt.Errorf("speaker capture: unknown error") + } + + s.cap = cap + + go func() { + <-ctx.Done() + stopped.Store(true) + C.speaker_stop(cap) + s.cap = nil + handle.Delete() + close(ch) + }() + + log.Printf("System audio capture started (ScreenCaptureKit, 16kHz mono)") + return ch, nil +} + +// Close stops capture and releases resources if Stream was called. +func (s *Speaker) Close() { + if s.cap != nil { + C.speaker_stop(s.cap) + s.cap = nil + } +} + +// tacitSpeakerSamplesCallback is called from Objective-C on each audio chunk. +// It converts the C int16 array into a Go slice and sends it to the channel. +// +//export tacitSpeakerSamplesCallback +func tacitSpeakerSamplesCallback(h C.uintptr_t, samples *C.int16_t, count C.int) { + handle := cgo.Handle(h) + ch, ok := handle.Value().(chan []int16) + if !ok { + return + } + + n := int(count) + buf := unsafe.Slice((*int16)(unsafe.Pointer(samples)), n) + dst := make([]int16, n) + copy(dst, buf) + + select { + case ch <- dst: + default: + // Drop frame if the pipeline is slow — prevents audio thread stall. + } +} diff --git a/pkg/capture/speaker_darwin.h b/pkg/capture/speaker_darwin.h new file mode 100644 index 0000000..77b59db --- /dev/null +++ b/pkg/capture/speaker_darwin.h @@ -0,0 +1,19 @@ +#pragma once +#include +#include + +// Opaque handle returned by speaker_create. +typedef struct SpeakerCapture SpeakerCapture; + +// tacitSpeakerSamplesCallback is defined in speaker_darwin.go (//export). +// Declared here so the .m file can call it. +extern void tacitSpeakerSamplesCallback(uintptr_t handle, int16_t* samples, int count); + +// speaker_create starts ScreenCaptureKit system audio capture (macOS 13+). +// On success returns a non-NULL handle and sets *errMsg to NULL. +// On failure returns NULL and sets *errMsg to a malloc'd error string (caller must free). +// goHandle is passed through to tacitSpeakerSamplesCallback as the first argument. +SpeakerCapture* speaker_create(uintptr_t goHandle, char** errMsg); + +// speaker_stop stops the stream and releases resources. +void speaker_stop(SpeakerCapture* cap); diff --git a/pkg/capture/speaker_darwin.m b/pkg/capture/speaker_darwin.m new file mode 100644 index 0000000..b937ecc --- /dev/null +++ b/pkg/capture/speaker_darwin.m @@ -0,0 +1,241 @@ +// speaker_darwin.m — ScreenCaptureKit system-audio capture for macOS 13+. +// Compiled as Objective-C by CGo on Darwin. + +#import +#import +#import +#include +#include +#include +#include + +#include "speaker_darwin.h" + +// --------------------------------------------------------------------------- +// Stream output delegate +// --------------------------------------------------------------------------- + +API_AVAILABLE(macos(13.0)) +@interface TacitSpeakerOutput : NSObject +@property (atomic) uintptr_t goHandle; +@property (atomic) BOOL stopped; +@end + +@implementation TacitSpeakerOutput + +- (void)stream:(SCStream *)stream + didOutputSampleBuffer:(CMSampleBufferRef)sampleBuffer + ofType:(SCStreamOutputType)type API_AVAILABLE(macos(12.3)) { + + @autoreleasepool { + if (type != SCStreamOutputTypeAudio) return; + if (self.stopped) return; + + AudioBufferList audioBufferList; + CMBlockBufferRef blockBuffer = NULL; + + OSStatus status = CMSampleBufferGetAudioBufferListWithRetainedBlockBuffer( + sampleBuffer, + NULL, + &audioBufferList, + sizeof(audioBufferList), + NULL, + NULL, + kCMSampleBufferFlag_AudioBufferList_Assure16ByteAlignment, + &blockBuffer + ); + + if (status != noErr || blockBuffer == NULL) return; + + if (audioBufferList.mNumberBuffers < 1) { + CFRelease(blockBuffer); + return; + } + + // Mono: one buffer, float32 samples + AudioBuffer* buf = &audioBufferList.mBuffers[0]; + float* floatSamples = (float*)buf->mData; + int frameCount = (int)(buf->mDataByteSize / sizeof(float)); + + if (frameCount <= 0) { + CFRelease(blockBuffer); + return; + } + + // Convert float32 [-1, 1] → int16 + int16_t* intSamples = (int16_t*)malloc((size_t)frameCount * sizeof(int16_t)); + if (!intSamples) { + CFRelease(blockBuffer); + return; + } + + for (int i = 0; i < frameCount; i++) { + float f = floatSamples[i]; + if (f > 1.0f) f = 1.0f; + if (f < -1.0f) f = -1.0f; + intSamples[i] = (int16_t)(f * 32767.0f); + } + + tacitSpeakerSamplesCallback(self.goHandle, intSamples, frameCount); + + free(intSamples); + CFRelease(blockBuffer); + } +} + +- (void)stream:(SCStream *)stream didStopWithError:(NSError *)error API_AVAILABLE(macos(12.3)) { + if (error && !self.stopped) { + NSLog(@"[tacit] speaker stream stopped with error: %@", error.localizedDescription); + } +} + +@end + +// --------------------------------------------------------------------------- +// SpeakerCapture struct +// --------------------------------------------------------------------------- + +struct SpeakerCapture { + // Use void* to avoid ObjC types in the C header. + void* stream; // SCStream* (CFRetained) + void* output; // TacitSpeakerOutput* (CFRetained) +}; + +// --------------------------------------------------------------------------- +// speaker_create +// --------------------------------------------------------------------------- + +SpeakerCapture* speaker_create(uintptr_t goHandle, char** errMsg) { + *errMsg = NULL; + + if (@available(macOS 13.0, *)) { + // Use __block so we can assign inside the completion handler. + __block SCShareableContent* content = nil; + __block NSError* contentError = nil; + + dispatch_semaphore_t sem = dispatch_semaphore_create(0); + + [SCShareableContent getShareableContentWithCompletionHandler:^( + SCShareableContent* c, NSError* e) { + content = c; + contentError = e; + dispatch_semaphore_signal(sem); + }]; + + // Wait up to 10 seconds for permission / content enumeration. + long result = dispatch_semaphore_wait( + sem, dispatch_time(DISPATCH_TIME_NOW, 10 * NSEC_PER_SEC)); + + if (result != 0) { + *errMsg = strdup("timed out waiting for SCShareableContent (Screen Recording permission may be missing)"); + return NULL; + } + + if (contentError != nil) { + const char* desc = [[contentError localizedDescription] UTF8String]; + *errMsg = strdup(desc ? desc : "SCShareableContent error"); + return NULL; + } + + if (content == nil || content.displays.count == 0) { + *errMsg = strdup("no displays found — cannot create audio filter"); + return NULL; + } + + // Use the first (main) display; exclude no windows → captures all app audio. + SCDisplay* display = content.displays[0]; + SCContentFilter* filter = [[SCContentFilter alloc] + initWithDisplay:display + excludingWindows:@[]]; + + SCStreamConfiguration* config = [[SCStreamConfiguration alloc] init]; + config.capturesAudio = YES; + config.sampleRate = 16000; + config.channelCount = 1; + // Minimise video overhead — we don't use it but SCStream still + // requires a non-zero size. Use the smallest allowed value. + config.width = 2; + config.height = 2; + config.minimumFrameInterval = CMTimeMake(1, 1); // 1 fps — negligible overhead + + TacitSpeakerOutput* output = [[TacitSpeakerOutput alloc] init]; + output.goHandle = goHandle; + output.stopped = NO; + + NSError* streamError = nil; + SCStream* stream = [[SCStream alloc] initWithFilter:filter + configuration:config + delegate:output]; + + BOOL added = [stream addStreamOutput:output + type:SCStreamOutputTypeAudio + sampleHandlerQueue:dispatch_get_global_queue( + DISPATCH_QUEUE_PRIORITY_HIGH, 0) + error:&streamError]; + if (!added || streamError != nil) { + const char* desc = [[streamError localizedDescription] UTF8String]; + *errMsg = strdup(desc ? desc : "failed to add stream output"); + return NULL; + } + + // Start the stream (async completion). + __block NSError* startError = nil; + dispatch_semaphore_t startSem = dispatch_semaphore_create(0); + + [stream startCaptureWithCompletionHandler:^(NSError* e) { + startError = e; + dispatch_semaphore_signal(startSem); + }]; + + dispatch_semaphore_wait( + startSem, dispatch_time(DISPATCH_TIME_NOW, 10 * NSEC_PER_SEC)); + + if (startError != nil) { + const char* desc = [[startError localizedDescription] UTF8String]; + *errMsg = strdup(desc ? desc : "failed to start SCStream"); + return NULL; + } + + // Retain Obj-C objects across the CGo boundary using CFBridgingRetain. + SpeakerCapture* cap = (SpeakerCapture*)calloc(1, sizeof(SpeakerCapture)); + cap->stream = (void*)CFBridgingRetain(stream); + cap->output = (void*)CFBridgingRetain(output); + + return cap; + } else { + *errMsg = strdup("system audio capture requires macOS 13.0 or later"); + return NULL; + } +} + +// --------------------------------------------------------------------------- +// speaker_stop +// --------------------------------------------------------------------------- + +void speaker_stop(SpeakerCapture* cap) { + if (!cap) return; + + if (@available(macOS 13.0, *)) { + if (cap->output) { + TacitSpeakerOutput* output = + (__bridge TacitSpeakerOutput*)cap->output; + output.stopped = YES; + } + + if (cap->stream) { + SCStream* stream = (__bridge SCStream*)cap->stream; + [stream stopCaptureWithCompletionHandler:^(NSError* e) { + // Ignore stop errors. + }]; + CFRelease(cap->stream); + cap->stream = NULL; + } + + if (cap->output) { + CFRelease(cap->output); + cap->output = NULL; + } + } + + free(cap); +} diff --git a/pkg/capture/speaker_e2e_test.go b/pkg/capture/speaker_e2e_test.go new file mode 100644 index 0000000..062ae49 --- /dev/null +++ b/pkg/capture/speaker_e2e_test.go @@ -0,0 +1,67 @@ +//go:build integration && darwin + +package capture + +import ( + "context" + "testing" + "time" +) + +// TestSpeaker_Stream_E2E verifies that the Speaker can start a ScreenCaptureKit +// stream and deliver audio chunks within a 5-second window. +// +// Prerequisites: +// - macOS 13.0+ +// - Screen Recording permission granted to the terminal / test runner +// (System Preferences → Privacy & Security → Screen Recording) +// +// Run with: +// +// go test -tags "integration darwin" -v -run TestSpeaker_Stream_E2E ./pkg/capture/ +func TestSpeaker_Stream_E2E(t *testing.T) { + spk, err := NewSpeaker() + if err != nil { + t.Fatalf("NewSpeaker: %v", err) + } + defer spk.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ch, err := spk.Stream(ctx) + if err != nil { + // Permission denied is an environment issue, not a code bug — skip gracefully. + t.Skipf("Stream: %v\n\tGrant Screen Recording permission to Terminal in System Preferences → Privacy & Security → Screen Recording", err) + } + + var totalSamples int + var chunkCount int + deadline := time.After(5 * time.Second) + +loop: + for { + select { + case samples, ok := <-ch: + if !ok { + t.Log("channel closed") + break loop + } + chunkCount++ + totalSamples += len(samples) + case <-deadline: + cancel() + break loop + } + } + + t.Logf("received %d chunks, %d total int16 samples (%.2fs of audio at 16kHz)", + chunkCount, totalSamples, float64(totalSamples)/16000.0) + + if chunkCount == 0 { + t.Fatal("no audio chunks received — ScreenCaptureKit stream may not be delivering data") + } + if totalSamples == 0 { + t.Fatal("received chunks but all were empty") + } +} diff --git a/pkg/capture/speaker_stub.go b/pkg/capture/speaker_stub.go new file mode 100644 index 0000000..2ef4334 --- /dev/null +++ b/pkg/capture/speaker_stub.go @@ -0,0 +1,22 @@ +//go:build !darwin + +package capture + +import ( + "context" + "fmt" +) + +// Speaker is a no-op stub on non-Darwin platforms. +type Speaker struct{} + +// NewSpeaker always returns an error on non-Darwin platforms. +func NewSpeaker() (*Speaker, error) { + return nil, fmt.Errorf("system audio capture is only supported on macOS") +} + +func (s *Speaker) Stream(_ context.Context) (<-chan []int16, error) { + return nil, fmt.Errorf("system audio capture is only supported on macOS") +} + +func (s *Speaker) Close() {} diff --git a/pkg/config/config.go b/pkg/config/config.go index 6c560db..8742387 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,6 +25,10 @@ type Config struct { LLMProvider string `yaml:"llm_provider"` LLMModel string `yaml:"llm_model"` SkillAgent string `yaml:"skill_agent"` + // CaptureSpeaker enables system-audio capture via ScreenCaptureKit (macOS 13+). + // When true, audio from speakers (Google Meet, YouTube, etc.) is also + // transcribed and stored. Requires Screen Recording permission. + CaptureSpeaker bool `yaml:"capture_speaker"` } // DefaultConfig returns a Config populated with default values. @@ -38,6 +42,7 @@ func DefaultConfig() *Config { LLMProvider: "ollama", LLMModel: "qwen3.5", SkillAgent: "claude", + CaptureSpeaker: true, } } @@ -111,7 +116,8 @@ func WriteDefault(path string) error { "energy_threshold: %.0f\n"+ "llm_provider: %s\n"+ "llm_model: %s\n"+ - "skill_agent: %s\n", + "skill_agent: %s\n"+ + "capture_speaker: %v\n", cfg.WhisperModel, formatDuration(cfg.MinSpeechDur), formatDuration(cfg.SilenceDuration), @@ -120,6 +126,7 @@ func WriteDefault(path string) error { cfg.LLMProvider, cfg.LLMModel, cfg.SkillAgent, + cfg.CaptureSpeaker, ) return os.WriteFile(path, []byte(content), 0644) } @@ -142,6 +149,7 @@ func WriteOverrideTemplate(path string, defaults *Config) error { fmt.Sprintf("llm_provider: %s", defaults.LLMProvider), fmt.Sprintf("llm_model: %s", defaults.LLMModel), fmt.Sprintf("skill_agent: %s", defaults.SkillAgent), + fmt.Sprintf("capture_speaker: %v", defaults.CaptureSpeaker), } var sb strings.Builder @@ -230,6 +238,7 @@ func WriteSetupOverride(path string, provider, model, agent string) error { {"llm_provider", provider, true}, {"llm_model", model, true}, {"skill_agent", agent, true}, + {"capture_speaker", fmt.Sprintf("%v", defaults.CaptureSpeaker), false}, } var sb strings.Builder diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 9823eff..2cf5379 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -27,6 +27,7 @@ var ErrSkipped = errors.New("content classified as meaningless, skipping") type Pipeline struct { cfg *config.Config whisper *stt.Whisper + whisperMu sync.Mutex // serialises concurrent STT calls from multiple sources classifier process.Classifier baseDir string } @@ -74,50 +75,82 @@ type classifyItem struct { timestamp time.Time } -// Run starts the real-time capture→VAD→STT→classify→store loop. -// STT runs synchronously; classification runs asynchronously in background -// with batching to amortize CLI startup cost. -// It blocks until ctx is cancelled. -func (p *Pipeline) Run(ctx context.Context) error { - // Init VAD (256 samples = 16ms at 16kHz) +// Run starts one or more audio sources through the VAD→STT→classify→store +// loop. Each source runs in its own goroutine; STT calls are serialised by a +// mutex so the shared Whisper instance is used safely. +// It blocks until ctx is cancelled or all sources exit. +func (p *Pipeline) Run(ctx context.Context, sources []capture.AudioSource) error { + if len(sources) == 0 { + return fmt.Errorf("no audio sources configured") + } + + // Single shared classify channel / worker so batching still works across + // multiple capture sources. + classifyCh := make(chan classifyItem, 64) + var classifyWg sync.WaitGroup + classifyWg.Add(1) + go func() { + defer classifyWg.Done() + p.classifyLoop(ctx, classifyCh) + }() + + // Start one VAD+STT goroutine per source. + var sourceWg sync.WaitGroup + for i, src := range sources { + sourceWg.Add(1) + label := sourceLabel(i, len(sources)) + go func(src capture.AudioSource, label string) { + defer sourceWg.Done() + if err := p.runSource(ctx, src, label, classifyCh); err != nil { + log.Printf("[%s] source error: %v", label, err) + } + }(src, label) + } + + sourceWg.Wait() + close(classifyCh) + classifyWg.Wait() + return nil +} + +// sourceLabel returns a short display name for the nth source. +func sourceLabel(i, total int) string { + if total == 1 { + return "mic" + } + switch i { + case 0: + return "mic" + case 1: + return "speaker" + default: + return fmt.Sprintf("src%d", i) + } +} + +// runSource runs a single audio source through VAD→STT and enqueues results +// onto classifyCh. It returns when ctx is cancelled or the source stream +// closes. +func (p *Pipeline) runSource(ctx context.Context, src capture.AudioSource, label string, classifyCh chan<- classifyItem) error { + // Init per-source VAD (256 samples = 16 ms at 16 kHz). const hopSize = 256 v, err := vad.New(hopSize, float32(p.cfg.SpeechThreshold)) if err != nil { return fmt.Errorf("init vad: %w", err) } defer v.Close() - log.Printf("VAD initialized (hop=%d, threshold=%.2f, energy=%.0f)", hopSize, p.cfg.SpeechThreshold, p.cfg.EnergyThreshold) - // Init microphone capture - mic, err := capture.New() + stream, err := src.Stream(ctx) if err != nil { - return fmt.Errorf("init capture: %w", err) + return fmt.Errorf("start stream: %w", err) } - defer mic.Close() - - stream, err := mic.Stream(ctx) - if err != nil { - return fmt.Errorf("start capture stream: %w", err) - } - - // Start async classify worker - classifyCh := make(chan classifyItem, 32) - var classifyWg sync.WaitGroup - classifyWg.Add(1) - go func() { - defer classifyWg.Done() - p.classifyLoop(ctx, classifyCh) - }() - // Segment buffer for accumulating speech audio segBuf := audio.NewSegmentBuffer(audio.SampleRate, p.cfg.MinSpeechDur) - - // VAD frame buffer: accumulate samples until we have hopSize var frameBuf []int16 silenceFrames := 0 silenceLimit := int(p.cfg.SilenceDuration.Seconds() * float64(audio.SampleRate) / float64(hopSize)) - log.Printf("Listening for speech... (silence timeout: %v, min speech: %v)", p.cfg.SilenceDuration, p.cfg.MinSpeechDur) + log.Printf("[%s] listening (silence=%v, minSpeech=%v)", label, p.cfg.SilenceDuration, p.cfg.MinSpeechDur) for chunk := range stream { frameBuf = append(frameBuf, chunk...) @@ -129,11 +162,11 @@ func (p *Pipeline) Run(ctx context.Context) error { _, isSpeech, err := v.Process(frame) if err != nil { - log.Printf("VAD error: %v", err) + log.Printf("[%s] VAD error: %v", label, err) continue } - // Energy gate: ignore VAD result if frame RMS is below threshold + // Energy gate. if isSpeech && p.cfg.EnergyThreshold > 0 { var sum float64 for _, s := range frame { @@ -149,7 +182,7 @@ func (p *Pipeline) Run(ctx context.Context) error { silenceFrames = 0 if !segBuf.IsActive() { segBuf.Start() - log.Printf("Speech started") + log.Printf("[%s] speech started", label) } segBuf.Append(audio.Int16ToFloat32(frame)) } else if segBuf.IsActive() { @@ -157,56 +190,56 @@ func (p *Pipeline) Run(ctx context.Context) error { silenceFrames++ if silenceFrames >= silenceLimit { - log.Printf("Speech ended (%.1fs)", segBuf.Duration().Seconds()) + log.Printf("[%s] speech ended (%.1fs)", label, segBuf.Duration().Seconds()) seg, ok := segBuf.Finish() silenceFrames = 0 if ok { - p.transcribeAndQueue(ctx, seg, classifyCh) + p.transcribeAndQueue(ctx, seg, label, classifyCh) } else { - log.Printf("Segment too short, discarding") + log.Printf("[%s] segment too short, discarding", label) } } } } - // Compact: move unprocessed samples to front to prevent memory leak + // Compact: move unprocessed samples to front to prevent memory leak. n := copy(frameBuf, frameBuf[processed:]) frameBuf = frameBuf[:n] } - close(classifyCh) - classifyWg.Wait() return nil } -// transcribeAndQueue runs STT synchronously and queues the text for async classification. -func (p *Pipeline) transcribeAndQueue(ctx context.Context, seg *audio.AudioSegment, ch chan<- classifyItem) { - log.Printf("Processing segment: %.1fs of audio", seg.Duration.Seconds()) +// transcribeAndQueue runs STT (serialised across sources) then queues text +// for async classification. +func (p *Pipeline) transcribeAndQueue(ctx context.Context, seg *audio.AudioSegment, label string, ch chan<- classifyItem) { + log.Printf("[%s] transcribing %.1fs of audio", label, seg.Duration.Seconds()) + p.whisperMu.Lock() text, err := p.whisper.Transcribe(ctx, seg.Samples, p.cfg.InitialPrompt) + p.whisperMu.Unlock() + if err != nil { - log.Printf("STT error: %v", err) + log.Printf("[%s] STT error: %v", label, err) return } if text == "" { - log.Printf("STT produced empty text, skipping") + log.Printf("[%s] STT produced empty text, skipping", label) return } - log.Printf("STT: %s", text) + log.Printf("[%s] STT: %s", label, text) ch <- classifyItem{text: text, timestamp: time.Now()} } -// classifyLoop processes classify items from the channel, batching when multiple -// items are queued up (e.g. during a long classification call). +// classifyLoop processes classify items from the channel, batching when +// multiple items are queued (e.g. during a long classification call). func (p *Pipeline) classifyLoop(ctx context.Context, ch <-chan classifyItem) { for { - // Block waiting for first item item, ok := <-ch if !ok { return } - // Drain any additional queued items for batching batch := []classifyItem{item} drain: for { @@ -309,7 +342,9 @@ func (p *Pipeline) ProcessFile(ctx context.Context, audioPath string) (string, e } log.Printf("Running STT...") + p.whisperMu.Lock() text, err := p.whisper.Transcribe(ctx, samples, p.cfg.InitialPrompt) + p.whisperMu.Unlock() if err != nil { return "", fmt.Errorf("transcribe: %w", err) } @@ -341,4 +376,3 @@ func (p *Pipeline) ProcessFile(ctx context.Context, audioPath string) (string, e return filePath, nil } - From 0dbcde7e26a8be0947eecec59b2ee05cf9b72ca5 Mon Sep 17 00:00:00 2001 From: Sangmin Lee Date: Sun, 5 Apr 2026 23:24:39 +0900 Subject: [PATCH 2/4] add stt source setup --- Makefile | 4 ++ cmd/tacit/main.go | 152 ++++++++++++++++++++++++++++++++++++++----- pkg/config/config.go | 18 +++-- 3 files changed, 154 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index 5bc1d04..31b69f4 100644 --- a/Makefile +++ b/Makefile @@ -95,6 +95,10 @@ install: build ifeq ($(UNAME_S),Darwin) rm -rf $(INSTALL_DIR)/ten_vad.framework cp -R ten_vad.framework $(INSTALL_DIR)/ten_vad.framework + xattr -dr com.apple.quarantine $(INSTALL_DIR)/tacit-dev 2>/dev/null || true + xattr -dr com.apple.quarantine $(INSTALL_DIR)/ten_vad.framework 2>/dev/null || true + codesign --force --deep --sign - $(INSTALL_DIR)/tacit-dev 2>/dev/null || true + codesign --force --deep --sign - $(INSTALL_DIR)/ten_vad.framework 2>/dev/null || true endif @echo "Installed to $(INSTALL_DIR)/tacit-dev" diff --git a/cmd/tacit/main.go b/cmd/tacit/main.go index b894b62..dd0987c 100644 --- a/cmd/tacit/main.go +++ b/cmd/tacit/main.go @@ -109,7 +109,7 @@ func cmdSetup() { var llmProvider, llmModel string // Step 1: LLM provider - fmt.Println("Step 1/3: Select LLM provider for summarization") + fmt.Println("Step 1/4: Select LLM provider for summarization") providerIdx := selectOption([]string{"ollama", "claude"}, 0) fmt.Println() @@ -118,7 +118,7 @@ func cmdSetup() { llmProvider = "claude" // Step 2: Claude model - fmt.Println("Step 2/3: Select Claude model") + fmt.Println("Step 2/4: Select Claude model") modelIdx := selectOption([]string{"haiku", "sonnet", "opus"}, 0) fmt.Println() switch modelIdx { @@ -135,7 +135,7 @@ func cmdSetup() { // Step 2: Ollama model (text input) reader := bufio.NewReader(os.Stdin) - fmt.Println("Step 2/3: Enter Ollama model name") + fmt.Println("Step 2/4: Enter Ollama model name") fmt.Print(" Model name [qwen3.5]: ") input := strings.TrimSpace(readLine(reader)) fmt.Println() @@ -147,24 +147,40 @@ func cmdSetup() { } // Step 3: AI agent for skill installation (only claude supported) - fmt.Println("Step 3/3: Select AI agent for skill installation") + fmt.Println("Step 3/4: Select AI agent for skill installation") agentNames := []string{"claude"} agentIdx := selectOption(agentNames, 0) skillAgent := agentNames[agentIdx] fmt.Println() + // Step 4: Audio sources (multi-select) — at least one must be selected. + var captureMic, captureSpeaker bool + for { + fmt.Println("Step 4/4: Select audio sources to listen (Space to toggle, Enter to confirm)") + sourceSelected := selectMultiple([]string{"mic", "speaker"}, []bool{true, true}) + fmt.Println() + captureMic, captureSpeaker = sourceSelected[0], sourceSelected[1] + if captureMic || captureSpeaker { + break + } + fmt.Println(" At least one source must be selected. Please try again.") + fmt.Println() + } + fmt.Println() - fmt.Printf(" LLM provider : %s\n", llmProvider) - fmt.Printf(" LLM model : %s\n", llmModel) - fmt.Printf(" Skill agent : %s\n", skillAgent) + fmt.Printf(" LLM provider : %s\n", llmProvider) + fmt.Printf(" LLM model : %s\n", llmModel) + fmt.Printf(" Skill agent : %s\n", skillAgent) + fmt.Printf(" Capture mic : %v\n", captureMic) + fmt.Printf(" Capture speaker: %v\n", captureSpeaker) fmt.Println() - // Write LLM settings to config-override.yaml + // Write settings to config-override.yaml overridePath := config.OverridePath() if err := os.MkdirAll(filepath.Dir(overridePath), 0755); err != nil { log.Fatalf("Failed to create config directory: %v", err) } - if err := config.WriteSetupOverride(overridePath, llmProvider, llmModel, skillAgent); err != nil { + if err := config.WriteSetupOverride(overridePath, llmProvider, llmModel, skillAgent, captureMic, captureSpeaker); err != nil { log.Fatalf("Failed to write config override: %v", err) } fmt.Printf("Saved settings: %s\n", overridePath) @@ -286,6 +302,100 @@ func selectOption(options []string, defaultIdx int) int { return cur } +// selectMultiple presents an interactive checkbox menu on stdout. Arrow keys +// move the cursor; Space toggles the current item; Enter confirms. Returns a +// slice of booleans aligned with options indicating the selected state. +// defaultSelected sets the initial checked state for each option. +func selectMultiple(options []string, defaultSelected []bool) []bool { + selected := make([]bool, len(options)) + copy(selected, defaultSelected) + cur := 0 + + fd := int(os.Stdin.Fd()) + oldState, err := term.MakeRaw(fd) + if err != nil { + // Fallback: numbered list + for i, o := range options { + mark := " " + if selected[i] { + mark = "x" + } + fmt.Printf(" [%s] %d) %s\n", mark, i+1, o) + } + fmt.Print("Toggle items by number (space-separated), then press Enter: ") + reader := bufio.NewReader(os.Stdin) + line := strings.TrimSpace(readLine(reader)) + for _, tok := range strings.Fields(line) { + for i := range options { + if tok == fmt.Sprintf("%d", i+1) { + selected[i] = !selected[i] + } + } + } + return selected + } + defer term.Restore(fd, oldState) + + draw := func(atTop bool) { + if !atTop { + fmt.Printf("\033[%dA", len(options)) + } + for i, o := range options { + fmt.Print("\r\033[2K") + mark := " " + if selected[i] { + mark = "x" + } + if i == cur { + fmt.Printf(" \033[36m> [%s] %s\033[0m\n", mark, o) + } else { + fmt.Printf(" [%s] %s\n", mark, o) + } + } + } + + draw(true) + + buf := make([]byte, 4) + for { + n, readErr := os.Stdin.Read(buf) + if readErr != nil || n == 0 { + break + } + switch { + case n == 1 && (buf[0] == '\r' || buf[0] == '\n'): // Enter — confirm + fmt.Printf("\033[%dA", len(options)) + for range options { + fmt.Print("\r\033[2K\n") + } + fmt.Printf("\033[%dA", len(options)) + for i, o := range options { + mark := " " + if selected[i] { + mark = "x" + } + fmt.Printf("\r\033[2K [%s] %s\n", mark, o) + } + return selected + case n == 1 && buf[0] == ' ': // Space — toggle + selected[cur] = !selected[cur] + draw(false) + case n >= 3 && buf[0] == 0x1b && buf[1] == '[' && buf[2] == 'A': // Up + if cur > 0 { + cur-- + draw(false) + } + case n >= 3 && buf[0] == 0x1b && buf[1] == '[' && buf[2] == 'B': // Down + if cur < len(options)-1 { + cur++ + draw(false) + } + } + } + + return selected +} + // cmdProcess handles the "process" subcommand: audio file → knowledge entry. func cmdProcess(cfg *config.Config) { if len(os.Args) < 3 { @@ -344,18 +454,24 @@ func cmdListen(cfg *config.Config) { defer p.Close() // Build audio sources. - mic, err := capture.New() - if err != nil { - log.Fatalf("Failed to init microphone: %v", err) - } - defer mic.Close() + var sources []capture.AudioSource - sources := []capture.AudioSource{mic} + if cfg.CaptureMic { + mic, err := capture.New() + if err != nil { + log.Fatalf("Failed to init microphone: %v", err) + } + defer mic.Close() + sources = append(sources, mic) + } if cfg.CaptureSpeaker { spk, err := capture.NewSpeaker() if err != nil { log.Printf("Warning: system audio capture unavailable: %v", err) + if len(sources) == 0 { + log.Fatalf("No audio sources available.") + } log.Printf("Continuing with microphone only.") } else { defer spk.Close() @@ -364,6 +480,10 @@ func cmdListen(cfg *config.Config) { } } + if len(sources) == 0 { + log.Fatalf("No audio sources configured. Enable capture_mic or capture_speaker in config.") + } + // Setup signal handling for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) sigCh := make(chan os.Signal, 1) @@ -447,6 +567,8 @@ func cmdConfigView(cfg *config.Config) { fmt.Printf("%-22s %-20s %s\n", "llm_provider:", cfg.LLMProvider, tag("llm_provider")) fmt.Printf("%-22s %-20s %s\n", "llm_model:", cfg.LLMModel, tag("llm_model")) fmt.Printf("%-22s %-20s %s\n", "skill_agent:", cfg.SkillAgent, tag("skill_agent")) + fmt.Printf("%-22s %-20v %s\n", "capture_mic:", cfg.CaptureMic, tag("capture_mic")) + fmt.Printf("%-22s %-20v %s\n", "capture_speaker:", cfg.CaptureSpeaker, tag("capture_speaker")) } // cmdConfigEdit opens the user override config file in a text editor. diff --git a/pkg/config/config.go b/pkg/config/config.go index 8742387..802c6a5 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,6 +25,9 @@ type Config struct { LLMProvider string `yaml:"llm_provider"` LLMModel string `yaml:"llm_model"` SkillAgent string `yaml:"skill_agent"` + // CaptureMic enables microphone capture. When true, speech from the + // microphone is transcribed and stored. Defaults to true. + CaptureMic bool `yaml:"capture_mic"` // CaptureSpeaker enables system-audio capture via ScreenCaptureKit (macOS 13+). // When true, audio from speakers (Google Meet, YouTube, etc.) is also // transcribed and stored. Requires Screen Recording permission. @@ -42,6 +45,7 @@ func DefaultConfig() *Config { LLMProvider: "ollama", LLMModel: "qwen3.5", SkillAgent: "claude", + CaptureMic: true, CaptureSpeaker: true, } } @@ -117,6 +121,7 @@ func WriteDefault(path string) error { "llm_provider: %s\n"+ "llm_model: %s\n"+ "skill_agent: %s\n"+ + "capture_mic: %v\n"+ "capture_speaker: %v\n", cfg.WhisperModel, formatDuration(cfg.MinSpeechDur), @@ -126,6 +131,7 @@ func WriteDefault(path string) error { cfg.LLMProvider, cfg.LLMModel, cfg.SkillAgent, + cfg.CaptureMic, cfg.CaptureSpeaker, ) return os.WriteFile(path, []byte(content), 0644) @@ -149,6 +155,7 @@ func WriteOverrideTemplate(path string, defaults *Config) error { fmt.Sprintf("llm_provider: %s", defaults.LLMProvider), fmt.Sprintf("llm_model: %s", defaults.LLMModel), fmt.Sprintf("skill_agent: %s", defaults.SkillAgent), + fmt.Sprintf("capture_mic: %v", defaults.CaptureMic), fmt.Sprintf("capture_speaker: %v", defaults.CaptureSpeaker), } @@ -206,10 +213,10 @@ func PIDPath() string { } // WriteSetupOverride writes a full override template with llm_provider, -// llm_model, and skill_agent set to the given values (uncommented). Other -// fields are preserved from the existing override file if present; otherwise -// they remain commented out with their default values. -func WriteSetupOverride(path string, provider, model, agent string) error { +// llm_model, skill_agent, capture_mic, and capture_speaker set to the given +// values (uncommented). Other fields are preserved from the existing override +// file if present; otherwise they remain commented out with their default values. +func WriteSetupOverride(path string, provider, model, agent string, captureMic, captureSpeaker bool) error { // Load existing override values to preserve non-LLM user settings. existing := map[string]interface{}{} if data, err := os.ReadFile(path); err == nil { @@ -238,7 +245,8 @@ func WriteSetupOverride(path string, provider, model, agent string) error { {"llm_provider", provider, true}, {"llm_model", model, true}, {"skill_agent", agent, true}, - {"capture_speaker", fmt.Sprintf("%v", defaults.CaptureSpeaker), false}, + {"capture_mic", fmt.Sprintf("%v", captureMic), true}, + {"capture_speaker", fmt.Sprintf("%v", captureSpeaker), true}, } var sb strings.Builder From 9df7138b65f097d2c0a1c9feffd6851ac877ef90 Mon Sep 17 00:00:00 2001 From: Sangmin Lee Date: Sun, 5 Apr 2026 23:28:52 +0900 Subject: [PATCH 3/4] simplify: fix speaker label bug and remove dead stopped flag - Pipeline.Run now takes explicit labels[]string so each source is logged with the correct name; removes the hardcoded sourceLabel() that always returned "mic" for single-source (broken when only speaker is enabled) - Remove unused atomic.Bool stopped from speaker_darwin.go (the ObjC side already guards via output.stopped) - Fix NewSpeaker doc comment: stream starts lazily in Stream(), not in the constructor Co-Authored-By: Claude Sonnet 4.6 --- cmd/tacit/main.go | 5 ++++- pkg/capture/speaker_darwin.go | 8 ++------ pkg/pipeline/pipeline.go | 20 +++----------------- 3 files changed, 9 insertions(+), 24 deletions(-) diff --git a/cmd/tacit/main.go b/cmd/tacit/main.go index dd0987c..c6a6e08 100644 --- a/cmd/tacit/main.go +++ b/cmd/tacit/main.go @@ -455,6 +455,7 @@ func cmdListen(cfg *config.Config) { // Build audio sources. var sources []capture.AudioSource + var sourceLabels []string if cfg.CaptureMic { mic, err := capture.New() @@ -463,6 +464,7 @@ func cmdListen(cfg *config.Config) { } defer mic.Close() sources = append(sources, mic) + sourceLabels = append(sourceLabels, "mic") } if cfg.CaptureSpeaker { @@ -476,6 +478,7 @@ func cmdListen(cfg *config.Config) { } else { defer spk.Close() sources = append(sources, spk) + sourceLabels = append(sourceLabels, "speaker") log.Printf("System audio capture enabled (requires Screen Recording permission)") } } @@ -499,7 +502,7 @@ func cmdListen(cfg *config.Config) { log.Printf("Knowledge base: %s", config.BaseDir()) log.Printf("Press Ctrl+C to stop") - if err := p.Run(ctx, sources); err != nil { + if err := p.Run(ctx, sources, sourceLabels); err != nil { log.Printf("Pipeline error: %v", err) } log.Printf("tacit daemon stopped") diff --git a/pkg/capture/speaker_darwin.go b/pkg/capture/speaker_darwin.go index e9b183b..6d5aef6 100644 --- a/pkg/capture/speaker_darwin.go +++ b/pkg/capture/speaker_darwin.go @@ -16,7 +16,6 @@ import ( "fmt" "log" "runtime/cgo" - "sync/atomic" "unsafe" ) @@ -27,9 +26,8 @@ type Speaker struct { cap *C.SpeakerCapture } -// NewSpeaker creates a new system-audio capture instance and starts the -// ScreenCaptureKit stream. Returns an error if macOS < 13, permission is -// denied, or the stream cannot be initialised. +// NewSpeaker creates a new system-audio capture instance. +// The ScreenCaptureKit stream is started lazily in Stream(). func NewSpeaker() (*Speaker, error) { // We create a temporary placeholder handle; the real handle is set in // Stream() once we have a channel to deliver samples into. @@ -41,7 +39,6 @@ func NewSpeaker() (*Speaker, error) { // The channel is closed when ctx is cancelled. func (s *Speaker) Stream(ctx context.Context) (<-chan []int16, error) { ch := make(chan []int16, 128) - var stopped atomic.Bool // Register the channel in the cgo handle registry. handle := cgo.NewHandle(ch) @@ -65,7 +62,6 @@ func (s *Speaker) Stream(ctx context.Context) (<-chan []int16, error) { go func() { <-ctx.Done() - stopped.Store(true) C.speaker_stop(cap) s.cap = nil handle.Delete() diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 2cf5379..99230f7 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -78,8 +78,9 @@ type classifyItem struct { // Run starts one or more audio sources through the VAD→STT→classify→store // loop. Each source runs in its own goroutine; STT calls are serialised by a // mutex so the shared Whisper instance is used safely. +// labels[i] is the display name for sources[i] used in log messages. // It blocks until ctx is cancelled or all sources exit. -func (p *Pipeline) Run(ctx context.Context, sources []capture.AudioSource) error { +func (p *Pipeline) Run(ctx context.Context, sources []capture.AudioSource, labels []string) error { if len(sources) == 0 { return fmt.Errorf("no audio sources configured") } @@ -98,7 +99,7 @@ func (p *Pipeline) Run(ctx context.Context, sources []capture.AudioSource) error var sourceWg sync.WaitGroup for i, src := range sources { sourceWg.Add(1) - label := sourceLabel(i, len(sources)) + label := labels[i] go func(src capture.AudioSource, label string) { defer sourceWg.Done() if err := p.runSource(ctx, src, label, classifyCh); err != nil { @@ -113,21 +114,6 @@ func (p *Pipeline) Run(ctx context.Context, sources []capture.AudioSource) error return nil } -// sourceLabel returns a short display name for the nth source. -func sourceLabel(i, total int) string { - if total == 1 { - return "mic" - } - switch i { - case 0: - return "mic" - case 1: - return "speaker" - default: - return fmt.Sprintf("src%d", i) - } -} - // runSource runs a single audio source through VAD→STT and enqueues results // onto classifyCh. It returns when ctx is cancelled or the source stream // closes. From 3787affc3773c0767e321852ce9ee265e40a0206 Mon Sep 17 00:00:00 2001 From: Sangmin Lee Date: Sun, 5 Apr 2026 23:45:47 +0900 Subject: [PATCH 4/4] optimize memory --- pkg/audio/segment.go | 23 +++++++++++++++--- pkg/audio/segment_test.go | 12 +++++----- pkg/config/config.go | 13 ++++++++++- pkg/pipeline/pipeline.go | 49 +++++++++++++++++++++++++++++++-------- 4 files changed, 77 insertions(+), 20 deletions(-) diff --git a/pkg/audio/segment.go b/pkg/audio/segment.go index 9ee6a13..63a0fa0 100644 --- a/pkg/audio/segment.go +++ b/pkg/audio/segment.go @@ -17,22 +17,39 @@ type SegmentBuffer struct { startTime time.Time sampleRate int // typically 16000 minDuration time.Duration // minimum speech duration to keep + maxDuration time.Duration // pre-allocation cap; 0 = no pre-alloc isActive bool } -// NewSegmentBuffer creates a buffer with the given sample rate and minimum duration. -func NewSegmentBuffer(sampleRate int, minDuration time.Duration) *SegmentBuffer { +// NewSegmentBuffer creates a buffer with the given sample rate, minimum +// duration, and optional maximum duration. When maxDuration > 0 the backing +// array is pre-allocated to exactly that size at Start() and reused across +// segments, eliminating append-induced reallocations. +func NewSegmentBuffer(sampleRate int, minDuration, maxDuration time.Duration) *SegmentBuffer { return &SegmentBuffer{ sampleRate: sampleRate, minDuration: minDuration, + maxDuration: maxDuration, } } // Start marks the beginning of a speech segment and records the start time. +// If maxDuration was set, the backing array is pre-allocated to that capacity +// (or reused from the previous segment) so no reallocation happens during +// Append calls. func (b *SegmentBuffer) Start() { b.isActive = true b.startTime = time.Now() - b.samples = b.samples[:0] + if b.maxDuration > 0 { + maxSamples := int(b.maxDuration.Seconds() * float64(b.sampleRate)) + if cap(b.samples) < maxSamples { + b.samples = make([]float32, 0, maxSamples) + } else { + b.samples = b.samples[:0] + } + } else { + b.samples = b.samples[:0] + } } // Append adds samples to the buffer. diff --git a/pkg/audio/segment_test.go b/pkg/audio/segment_test.go index 7b983da..b22ef9a 100644 --- a/pkg/audio/segment_test.go +++ b/pkg/audio/segment_test.go @@ -18,7 +18,7 @@ func makeSamples(n int, v float32) []float32 { func TestNewSegmentBuffer(t *testing.T) { minDur := 2 * time.Second - buf := NewSegmentBuffer(testSampleRate, minDur) + buf := NewSegmentBuffer(testSampleRate, minDur, 0) if buf.sampleRate != testSampleRate { t.Errorf("expected sampleRate %d, got %d", testSampleRate, buf.sampleRate) @@ -36,7 +36,7 @@ func TestNewSegmentBuffer(t *testing.T) { func TestSegmentBuffer_ShortSpeech(t *testing.T) { minDur := 2 * time.Second - buf := NewSegmentBuffer(testSampleRate, minDur) + buf := NewSegmentBuffer(testSampleRate, minDur, 0) buf.Start() @@ -58,7 +58,7 @@ func TestSegmentBuffer_ShortSpeech(t *testing.T) { func TestSegmentBuffer_ValidSpeech(t *testing.T) { minDur := 2 * time.Second - buf := NewSegmentBuffer(testSampleRate, minDur) + buf := NewSegmentBuffer(testSampleRate, minDur, 0) buf.Start() @@ -95,7 +95,7 @@ func TestSegmentBuffer_ValidSpeech(t *testing.T) { } func TestSegmentBuffer_Duration(t *testing.T) { - buf := NewSegmentBuffer(testSampleRate, time.Second) + buf := NewSegmentBuffer(testSampleRate, time.Second, 0) buf.Start() @@ -125,7 +125,7 @@ func TestSegmentBuffer_Duration(t *testing.T) { } func TestSegmentBuffer_Reset(t *testing.T) { - buf := NewSegmentBuffer(testSampleRate, time.Second) + buf := NewSegmentBuffer(testSampleRate, time.Second, 0) buf.Start() buf.Append(makeSamples(testSampleRate*2, 0.5)) @@ -155,7 +155,7 @@ func TestSegmentBuffer_Reset(t *testing.T) { func TestSegmentBuffer_MultipleSegments(t *testing.T) { minDur := time.Second - buf := NewSegmentBuffer(testSampleRate, minDur) + buf := NewSegmentBuffer(testSampleRate, minDur, 0) // First segment: 2 seconds. buf.Start() diff --git a/pkg/config/config.go b/pkg/config/config.go index 802c6a5..9c9f9c8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -32,6 +32,12 @@ type Config struct { // When true, audio from speakers (Google Meet, YouTube, etc.) is also // transcribed and stored. Requires Screen Recording permission. CaptureSpeaker bool `yaml:"capture_speaker"` + // MaxSegmentDur caps the maximum length of a single speech segment sent to + // STT. When a segment grows beyond this, it is force-split and transcribed + // immediately even if speech is still ongoing. This prevents unbounded + // memory growth when capturing continuous audio (e.g. long videos). + // 0 disables the cap. Default: 30s. + MaxSegmentDur time.Duration `yaml:"max_segment_duration"` } // DefaultConfig returns a Config populated with default values. @@ -47,6 +53,7 @@ func DefaultConfig() *Config { SkillAgent: "claude", CaptureMic: true, CaptureSpeaker: true, + MaxSegmentDur: 30 * time.Second, } } @@ -122,7 +129,8 @@ func WriteDefault(path string) error { "llm_model: %s\n"+ "skill_agent: %s\n"+ "capture_mic: %v\n"+ - "capture_speaker: %v\n", + "capture_speaker: %v\n"+ + "max_segment_duration: %s\n", cfg.WhisperModel, formatDuration(cfg.MinSpeechDur), formatDuration(cfg.SilenceDuration), @@ -133,6 +141,7 @@ func WriteDefault(path string) error { cfg.SkillAgent, cfg.CaptureMic, cfg.CaptureSpeaker, + formatDuration(cfg.MaxSegmentDur), ) return os.WriteFile(path, []byte(content), 0644) } @@ -157,6 +166,7 @@ func WriteOverrideTemplate(path string, defaults *Config) error { fmt.Sprintf("skill_agent: %s", defaults.SkillAgent), fmt.Sprintf("capture_mic: %v", defaults.CaptureMic), fmt.Sprintf("capture_speaker: %v", defaults.CaptureSpeaker), + fmt.Sprintf("max_segment_duration: %s", formatDuration(defaults.MaxSegmentDur)), } var sb strings.Builder @@ -247,6 +257,7 @@ func WriteSetupOverride(path string, provider, model, agent string, captureMic, {"skill_agent", agent, true}, {"capture_mic", fmt.Sprintf("%v", captureMic), true}, {"capture_speaker", fmt.Sprintf("%v", captureSpeaker), true}, + {"max_segment_duration", formatDuration(defaults.MaxSegmentDur), false}, } var sb strings.Builder diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 99230f7..f427e82 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "strings" + "github.com/sangmin7648/tacit/pkg/audio" "github.com/sangmin7648/tacit/pkg/capture" "github.com/sangmin7648/tacit/pkg/config" @@ -131,11 +133,16 @@ func (p *Pipeline) runSource(ctx context.Context, src capture.AudioSource, label return fmt.Errorf("start stream: %w", err) } - segBuf := audio.NewSegmentBuffer(audio.SampleRate, p.cfg.MinSpeechDur) + segBuf := audio.NewSegmentBuffer(audio.SampleRate, p.cfg.MinSpeechDur, p.cfg.MaxSegmentDur) var frameBuf []int16 silenceFrames := 0 silenceLimit := int(p.cfg.SilenceDuration.Seconds() * float64(audio.SampleRate) / float64(hopSize)) + // textBuf accumulates STT results from split chunks within one speech session. + // All chunks are joined and sent as a single classify item when silence is detected. + var textBuf []string + var sessionStart time.Time + log.Printf("[%s] listening (silence=%v, minSpeech=%v)", label, p.cfg.SilenceDuration, p.cfg.MinSpeechDur) for chunk := range stream { @@ -167,10 +174,26 @@ func (p *Pipeline) runSource(ctx context.Context, src capture.AudioSource, label if isSpeech { silenceFrames = 0 if !segBuf.IsActive() { + if len(textBuf) == 0 { + sessionStart = time.Now() + } segBuf.Start() log.Printf("[%s] speech started", label) } segBuf.Append(audio.Int16ToFloat32(frame)) + + // Force-split long segments to cap memory usage; accumulate + // the resulting text to merge into one file at session end. + if p.cfg.MaxSegmentDur > 0 && segBuf.Duration() >= p.cfg.MaxSegmentDur { + log.Printf("[%s] segment capped at %.1fs, splitting", label, segBuf.Duration().Seconds()) + seg, ok := segBuf.Finish() + if ok { + if text := p.transcribeSync(ctx, seg, label); text != "" { + textBuf = append(textBuf, text) + } + } + segBuf.Start() // speech is still ongoing; restart immediately + } } else if segBuf.IsActive() { segBuf.Append(audio.Int16ToFloat32(frame)) silenceFrames++ @@ -180,10 +203,17 @@ func (p *Pipeline) runSource(ctx context.Context, src capture.AudioSource, label seg, ok := segBuf.Finish() silenceFrames = 0 if ok { - p.transcribeAndQueue(ctx, seg, label, classifyCh) - } else { + if text := p.transcribeSync(ctx, seg, label); text != "" { + textBuf = append(textBuf, text) + } + } else if len(textBuf) == 0 { log.Printf("[%s] segment too short, discarding", label) } + // Flush accumulated text as a single classify item. + if len(textBuf) > 0 { + classifyCh <- classifyItem{text: strings.Join(textBuf, " "), timestamp: sessionStart} + textBuf = textBuf[:0] + } } } } @@ -195,9 +225,9 @@ func (p *Pipeline) runSource(ctx context.Context, src capture.AudioSource, label return nil } -// transcribeAndQueue runs STT (serialised across sources) then queues text -// for async classification. -func (p *Pipeline) transcribeAndQueue(ctx context.Context, seg *audio.AudioSegment, label string, ch chan<- classifyItem) { +// transcribeSync runs STT (serialised across sources) and returns the +// transcribed text, or "" on error or empty result. +func (p *Pipeline) transcribeSync(ctx context.Context, seg *audio.AudioSegment, label string) string { log.Printf("[%s] transcribing %.1fs of audio", label, seg.Duration.Seconds()) p.whisperMu.Lock() @@ -206,15 +236,14 @@ func (p *Pipeline) transcribeAndQueue(ctx context.Context, seg *audio.AudioSegme if err != nil { log.Printf("[%s] STT error: %v", label, err) - return + return "" } if text == "" { log.Printf("[%s] STT produced empty text, skipping", label) - return + return "" } log.Printf("[%s] STT: %s", label, text) - - ch <- classifyItem{text: text, timestamp: time.Now()} + return text } // classifyLoop processes classify items from the channel, batching when