Skip to content

Commit

Permalink
Merge pull request #25 from wsc1/master
Browse files Browse the repository at this point in the history
cb cross package, deadlines
  • Loading branch information
wsc1 authored Oct 3, 2018
2 parents 021b1b1 + cfe16ae commit b055012
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 90 deletions.
32 changes: 11 additions & 21 deletions libsio/cb.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

#include "cb.h"

// forward decl.
static void toGoAndBack(Cb *cb);
static void inCb(Cb *cb, void *in, int nf);
static void outCb(Cb *cb, void *out, int *nf);
static void duplexCb(Cb *cb, void *out, int *nf, void *in, int isz);

Cb * newCb(int bufSz) {
Cb * cb = (Cb*) malloc(sizeof(Cb));
Expand All @@ -27,32 +32,17 @@ Cb * newCb(int bufSz) {
cb->time.tv_sec = 0;
cb->time.tv_nsec = 1000;
cb->inGo = 0;
fprintf(stderr, "cb c addr %p inGo c addr on new: %p\n", cb, &cb->inGo);
cb->inCb = inCb;
cb->outCb = outCb;
cb->duplexCb = duplexCb;
return cb;
}

void freeCb(Cb *cb) {
free(cb);
}

void * getIn(Cb *cb) {
return cb->in;
}

int getInF(Cb *cb) {
return cb->inF;
}

void * getOut(Cb *cb) {
return cb->out;
}

int getOutF(Cb *cb) {
return cb->outF;
}

// forward decl.
static void toGoAndBack(Cb *cb);

/*
From Ian Lance Taylor: in C11 stdatomic terms Go atomic.CompareAndSwap is like
Expand All @@ -70,20 +60,20 @@ control an airplane. But for audio, we'll give it a shot.
* inCb is written to be called in an audio i/o callback API, as described
* in cb.md, for capture.
*/
void inCb(Cb *cb, void *in, int nF) {
static void inCb(Cb *cb, void *in, int nF) {
cb->in = in;
cb->inF = nF;
toGoAndBack(cb);
}

void outCb(Cb *cb, void *out, int *nF) {
static void outCb(Cb *cb, void *out, int *nF) {
cb->out = out;
cb->outF = *nF;
toGoAndBack(cb);
*nF = cb->outF;
}

void duplexCb(Cb *cb, void *out, int *onF, void *in, int inF) {
static void duplexCb(Cb *cb, void *out, int *onF, void *in, int inF) {
cb->in = in;
cb->inF = inF;
cb->out = out;
Expand Down
67 changes: 46 additions & 21 deletions libsio/cb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

package libsio

// #cgo CFLAGS: -std=c11
// #cgo CFLAGS: -std=c11 -DLIBSIO
// #include "cb.h"
import "C"
import (
"errors"
"fmt"
"io"
"runtime"
"sync/atomic"
Expand Down Expand Up @@ -123,6 +124,11 @@ type MissedDeadline struct {
OffBy time.Duration
}

// String for convenience.
func (m *MissedDeadline) String() string {
return fmt.Sprintf("missed frame %d by %s\n", m.Frame, m.OffBy)
}

// NewCb creates a new Cb for the specified form (channels + sample rate)
// sample codec and buffer size b in frames.
func NewCb(v sound.Form, sco sample.Codec, b int) *Cb {
Expand Down Expand Up @@ -166,9 +172,9 @@ func (r *Cb) LastMissed() bool {
// By default, this is equal to the buffer size. As a result,
// if the minimum number of frames exchanged is less than the
// buffer size, it should be set with SetMinCbFrames. A value
// of 0 is acceptable if the value is unknown.
// of 1 is acceptable if the value is unknown.
//
// This has an effect on CPU utilisation, as deadlines
// This has an effect on CPU utilisation, as sleep deadlines
// are calculated with respect to the minimum number of
// frames that may be exchanged with the underlying API.
// So if the minimum is significantly less than the buffer frame size,
Expand Down Expand Up @@ -202,6 +208,7 @@ func (r *Cb) Receive(d []float64) (int, error) {
bps := r.sco.Bytes()
var nf, onf int // frame counter and overlap frame count
var cbBuf []byte // cast from C pointer callback data

for start < nF {
if err := r.fromC(addr); err != nil {
return 0, ErrCApiLost
Expand All @@ -215,8 +222,9 @@ func (r *Cb) Receive(d []float64) (int, error) {
}
return 0, io.EOF
}

if start == 0 && r.frames == 0 {
r.setOrgTime(-nf)
r.setOrgTime(0)
}

// in case the C cb doesn't align to the buffer size
Expand All @@ -243,6 +251,7 @@ func (r *Cb) Receive(d []float64) (int, error) {
}
start += nf
r.frames += int64(nf)
r.checkDeadline(r.frames + int64(len(r.over)))
}
r.il.Deinter(d[:start*nC])
return start, nil
Expand All @@ -269,6 +278,7 @@ func (r *Cb) Send(d []float64) error {
var nf int
var cbBuf []byte
for start < nF {
r.checkDeadline(r.frames)
if err := r.fromC(addr); err != nil {
return ErrCApiLost
}
Expand All @@ -295,12 +305,18 @@ func (r *Cb) Send(d []float64) error {
if err := r.toC(addr); err != nil {
return ErrCApiLost
}
start += nf
r.frames += int64(nf)
start += nf
}
return nil
}

// C returns a pointer to the C.Cb which does the C callbacks for
// r.
func (r *Cb) C() unsafe.Pointer {
return unsafe.Pointer(r.c)
}

// TBD
func (r *Cb) SendReceive(out, in []float64) (int, error) {
return 0, nil
Expand All @@ -316,33 +332,42 @@ func (r *Cb) setOrgTime(nf int) {
r.orgTime = time.Now().Add(d)
}

// sleep only if underlying API is regular w.r.t.
// supplied buffer sizes and buffer size is bigger
// than OS latency jitter.
// maybeSleep sleeps only if the minimum buffer size is bigger than estimated
// OS latency jitter. see sleepSlack above.
func (r *Cb) maybeSleep() {
if r.frames == 0 {
return
}
if r.minCbf == 0 {
trg := r.orgTime.Add(time.Duration(int64(r.bsz)+r.frames) * r.frameDur)
deadline := time.Until(trg)
if deadline < 0 {
r.misses = append(r.misses, MissedDeadline{r.frames, deadline})
}
return
}
trg := r.orgTime.Add(time.Duration(int64(r.minCbf)+r.frames) * r.frameDur)
trg := r.orgTime.Add(time.Duration(int64(r.bsz)+r.frames) * r.frameDur)
deadline := time.Until(trg)
if deadline < 0 {
r.misses = append(r.misses, MissedDeadline{r.frames, deadline})
return
}
if deadline <= sleepSlack {
return
}
time.Sleep(deadline - sleepSlack)
}

// checkDeadline checks whether r has missed a deadline according to the sample rate
// and the number of sample frames exchanged.
//
// checkDeadline only works after some samples have been exchanged with the underlying
// API. It is called before exchanging subsequent samples to ensure that the exchange
// occurs before the real time represented by previously exchanged samples.
//
// Since the underlying API may allow us be late from time to time like this, a missed
// deadline does not necessarily imply that we have caused glitching. No missed
// deadlines does imply the underlying API should have the opportunity to proceed
// without glitching.
func (r *Cb) checkDeadline(nf int64) {
if r.frames == 0 {
return
}
trg := r.orgTime.Add(time.Duration(nf+1) * r.frameDur)
deadline := time.Until(trg)
if deadline < 0 {
r.misses = append(r.misses, MissedDeadline{nf, -deadline})
}
}

// ErrCApiLost can be returned if the thread running the C API
// is somehow killed or the hardware causes the callbacks to
// block.
Expand Down
38 changes: 24 additions & 14 deletions libsio/cb.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,38 @@
#include <stdint.h>
#include <time.h>

// Cb holds data for exchanging information between a C or OS callback on a
// dedicated thread foreign to the calling application and Go.
//
typedef struct Cb {
int bufSz;
_Atomic uint32_t inGo;
void * in;
int inF;
void * out;
int outF;
struct timespec time;
int bufSz; // in frames
_Atomic uint32_t inGo; // whether we've passed of the buffer to go and stlll await it to finish
void * in; // input buffer
int inF; // input number of sample frames
void * out; // output buffer
int outF; // output number of sample frames
struct timespec time; // for throttling a bit when there's a long wait.


// function pointers below are used to give access to callbacks to
// other Go packaages. They can can access a Cb * as a go unsafe.Pointer by importing
// libsio, and then C code in that package can call these function pointers.
// It doesn't seem possible to share C function definitions between packages by cgo alone,
// so we came up with this mechanism. These fields are populated by newCb.
void (*inCb)(struct Cb *cb, void *in, int nf);
void (*outCb)(struct Cb *cb, void *out, int *nf);
void (*duplexCb)(struct Cb *cb, void *out, int *onf, void *in, int inf);
} Cb;

#ifdef LIBSIO

// only libsio can access these functions via cgo package "C".
Cb * newCb(int bufSz);

void freeCb(Cb *cb);
void * getIn(Cb *cb);
int getInF(Cb *cb);
void * getOut(Cb *cb);
int getOutF(Cb *cb);
void inCb(Cb *cb, void *in, int nf);
void outCb(Cb *cb, void *out, int *nf);
void duplexCb(Cb *cb, void *out, int *nf, void *in, int isz);
void closeCb(Cb *cb);

#endif


#endif
14 changes: 14 additions & 0 deletions libsio/cb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func TestCbCapture(t *testing.T) {
} else if n != b {
t.Errorf("expected %d got %d\n", b, n)
}
if cb.LastMissed() {
fmt.Printf("misses:\n")
ms := cb.LastMisses()
for i := range ms {
fmt.Printf("\t%s\n", &ms[i])
}
}
}
}

Expand All @@ -44,5 +51,12 @@ func TestCbPlay(t *testing.T) {
if err != nil {
t.Error(err)
}
if cb.LastMissed() {
fmt.Printf("misses:\n")
ms := cb.LastMisses()
for i := range ms {
fmt.Printf("\t%s\n", &ms[i])
}
}
}
}
5 changes: 3 additions & 2 deletions libsio/runcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ package libsio
// sleepTime.tv_sec = 0;
// sleepTime.tv_nsec = 500000L;
// int of;
// Cb *cb = rcb->cb;
// for (int i = 0; i < rcb->n; i++) {
// if (rcb->input) {
// inCb(rcb->cb, rcb->buf, rcb->bf);
// cb->inCb(cb, rcb->buf, rcb->bf);
// } else {
// of = rcb->bf;
// outCb(rcb->cb, rcb->buf, &of);
// cb->outCb(cb, rcb->buf, &of);
// }
// nanosleep(&sleepTime, NULL);
// }
Expand Down
3 changes: 3 additions & 0 deletions ports/darwin/aqs_darwin.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2018 The ZikiChombo Authors. All rights reserved. Use of this source
// code is governed by a license that can be found in the License file.

package darwin

import (
Expand Down
Loading

0 comments on commit b055012

Please sign in to comment.