Skip to content

Commit

Permalink
Merge pull request #8 from membraneframework/feature/sync
Browse files Browse the repository at this point in the history
Feature/sync
  • Loading branch information
Hajto authored Oct 2, 2019
2 parents 4d57f5b + d57cbcf commit 7df4cbd
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 63 deletions.
25 changes: 17 additions & 8 deletions c_src/membrane_element_portaudio/pa_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
#define MEMBRANE_LOG_TAG log_tag
#include <membrane/log.h>

char *init_pa(UnifexEnv *env, char *log_tag, char direction, PaStream **stream,
void *state, PaSampleFormat sample_format, int sample_rate,
int channels, char *latency_str, int pa_buffer_size,
PaDeviceIndex endpoint_id, PaStreamCallback *callback) {
char *init_pa(UnifexEnv *env, char *log_tag, StreamDirection direction,
PaStream **stream, void *state, PaSampleFormat sample_format,
int sample_rate, int channels, char *latency_str, int *latency_ms,
int pa_buffer_size, PaDeviceIndex endpoint_id,
PaStreamCallback *callback) {
char *ret_error = NULL;
PaError pa_error;

Expand Down Expand Up @@ -48,16 +49,14 @@ char *init_pa(UnifexEnv *env, char *log_tag, char direction, PaStream **stream,

PaStreamParameters *input_stream_params_ptr = NULL;
PaStreamParameters *output_stream_params_ptr = NULL;
if (direction)
if (direction == STREAM_DIRECTION_OUT)
output_stream_params_ptr = &stream_params;
else
input_stream_params_ptr = &stream_params;

pa_error =
Pa_OpenStream(stream, input_stream_params_ptr, output_stream_params_ptr,
sample_rate, pa_buffer_size,
0, // PaStreamFlags
callback,
sample_rate, pa_buffer_size, paNoFlag, callback,
state // passed to the callback
);

Expand All @@ -68,6 +67,16 @@ char *init_pa(UnifexEnv *env, char *log_tag, char direction, PaStream **stream,
goto error;
}

const PaStreamInfo *stream_info = Pa_GetStreamInfo(*stream);
PaTime latency_sec;
if (direction == STREAM_DIRECTION_OUT) {
latency_sec = stream_info->outputLatency;
} else {
latency_sec = stream_info->inputLatency;
}

*latency_ms = (int)(latency_sec * 1000);

pa_error = Pa_StartStream(*stream);
if (pa_error != paNoError) {
MEMBRANE_WARN(env, "Pa_StartStream: error = %d (%s)", pa_error,
Expand Down
11 changes: 7 additions & 4 deletions c_src/membrane_element_portaudio/pa_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#include <string.h>
#include <unifex/unifex.h>

char *init_pa(UnifexEnv *env, char *log_tag, char direction, PaStream **stream,
void *state, PaSampleFormat sample_format, int sample_rate,
int channels, char *latency_str, int pa_buffer_size,
PaDeviceIndex endpoint_id, PaStreamCallback *callback);
typedef enum { STREAM_DIRECTION_IN, STREAM_DIRECTION_OUT } StreamDirection;

char *init_pa(UnifexEnv *env, char *log_tag, StreamDirection direction,
PaStream **stream, void *state, PaSampleFormat sample_format,
int sample_rate, int channels, char *latency_str, int *latency_ms,
int pa_buffer_size, PaDeviceIndex endpoint_id,
PaStreamCallback *callback);

char *destroy_pa(UnifexEnv *env, char *log_tag, PaStream *stream);
38 changes: 27 additions & 11 deletions c_src/membrane_element_portaudio/sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
#include <membrane/log.h>

#define FRAME_SIZE 4 // TODO hardcoded format, stereo frame, 16bit
#define BUFFERS_PER_TICK 100

#define SAMPLE_RATE 48000
#define SAMPLE_RATE_PER_MS (SAMPLE_RATE / 100)
#define CHANNELS_NUM 2

void handle_destroy_state(UnifexEnv *env, SinkState *state) {
if (state->is_content_destroyed)
Expand All @@ -11,8 +16,9 @@ void handle_destroy_state(UnifexEnv *env, SinkState *state) {
memcpy(temp_state, state, sizeof(SinkState));

UnifexPid exec_pid;
if (!unifex_get_pid_by_name(
env, "Elixir.Membrane.Element.PortAudio.SyncExecutor", 0, &exec_pid) ||
if (!unifex_get_pid_by_name(env,
"Elixir.Membrane.Element.PortAudio.SyncExecutor",
0, &exec_pid) ||
!send_destroy(env, exec_pid, 0, temp_state)) {
MEMBRANE_WARN(env, "PortAudio sink: failed to destroy state");
}
Expand All @@ -29,6 +35,12 @@ static int callback(const void *_input_buffer, void *output_buffer,
UnifexEnv *env = unifex_alloc_env();
SinkState *state = (SinkState *)user_data;

if (++state->ticks % BUFFERS_PER_TICK == 0) {
send_membrane_clock_update(env, state->membrane_clock, UNIFEX_SEND_THREADED,
BUFFERS_PER_TICK * frames_per_buffer,
SAMPLE_RATE_PER_MS);
}

size_t elements_available =
membrane_ringbuffer_get_read_available(state->ringbuffer);
if (elements_available >= frames_per_buffer) {
Expand All @@ -51,12 +63,14 @@ static int callback(const void *_input_buffer, void *output_buffer,
return paContinue;
}

UNIFEX_TERM create(UnifexEnv *env, UnifexPid demand_handler, int endpoint_id,
UNIFEX_TERM create(UnifexEnv *env, UnifexPid demand_handler,
UnifexPid membrane_clock, int endpoint_id,
int ringbuffer_size, int pa_buffer_size, char *latency) {
MEMBRANE_DEBUG(env, "initializing");

char *error;
SinkState *state = NULL;
int latency_ms;
UNIFEX_TERM res;

MembraneRingBuffer *ringbuffer =
Expand All @@ -74,24 +88,26 @@ UNIFEX_TERM create(UnifexEnv *env, UnifexPid demand_handler, int endpoint_id,
state->is_content_destroyed = 0;
state->ringbuffer = ringbuffer;
state->demand_handler = demand_handler;
state->membrane_clock = membrane_clock;
state->stream = NULL;
state->demand = 0;
state->ticks = 0;

error = init_pa(env, MEMBRANE_LOG_TAG,
1, // direction
&(state->stream), state,
paInt16, // sample format #TODO hardcoded
48000, // sample rate #TODO hardcoded
2, // channels #TODO hardcoded
latency, pa_buffer_size, endpoint_id, callback);
error = init_pa(env, MEMBRANE_LOG_TAG, STREAM_DIRECTION_OUT, &(state->stream),
state,
paInt16, // sample format #TODO hardcoded
SAMPLE_RATE, // sample rate #TODO hardcoded
CHANNELS_NUM, // channels #TODO hardcoded
latency, &latency_ms, pa_buffer_size, endpoint_id, callback);

if (error) {
goto error;
}

error:

res = error ? create_result_error(env, error) : create_result_ok(env, state);
res = error ? create_result_error(env, error)
: create_result_ok(env, latency_ms, state);

if (state) {
unifex_release_state(env, state);
Expand Down
2 changes: 2 additions & 0 deletions c_src/membrane_element_portaudio/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ typedef struct _SinkState {
PaStream *stream;
MembraneRingBuffer *ringbuffer;
UnifexPid demand_handler; // Where to send demands
UnifexPid membrane_clock;
int demand;
int ticks;
} SinkState;

typedef SinkState UnifexNifState;
Expand Down
4 changes: 3 additions & 1 deletion c_src/membrane_element_portaudio/sink.spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ module Native

spec create(
demand_handler :: pid,
clock :: pid,
endpoint_id :: int,
ringbuffer_size :: int,
pa_buffer_size :: int,
latency :: atom
) :: {:ok :: label, state} | {:error :: label, reason :: atom}
) :: {:ok :: label, {latency_ms :: int, state}} | {:error :: label, reason :: atom}

spec write_data(payload, state) :: (:ok :: label) | {:error :: label, :overrun :: label}

spec destroy(state) :: :ok

sends {Native, :destroy :: label, state}
sends {:portaudio_demand :: label, size :: int}
sends {:membrane_clock_update :: label, {frames :: int, sample_rate_ms :: int}}
19 changes: 10 additions & 9 deletions c_src/membrane_element_portaudio/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ void handle_destroy_state(UnifexEnv *env, SourceState *state) {
memcpy(temp_state, state, sizeof(SourceState));

UnifexPid exec_pid;
if (!unifex_get_pid_by_name(
env, "Elixir.Membrane.Element.PortAudio.SyncExecutor", 0, &exec_pid) ||
if (!unifex_get_pid_by_name(env,
"Elixir.Membrane.Element.PortAudio.SyncExecutor",
0, &exec_pid) ||
!send_destroy(env, exec_pid, 0, temp_state)) {
MEMBRANE_WARN(env, "Failed to destroy state");
}
Expand Down Expand Up @@ -52,13 +53,13 @@ UNIFEX_TERM create(UnifexEnv *env, UnifexPid destination, int endpoint_id,
state->destination = destination;
state->stream = NULL;

char *error = init_pa(env, MEMBRANE_LOG_TAG,
0, // direction
&(state->stream), state,
paInt16, // sample format #TODO hardcoded
48000, // sample rate #TODO hardcoded
2, // channels #TODO hardcoded
latency, pa_buffer_size, endpoint_id, callback);
int _latency_ms;
char *error = init_pa(
env, MEMBRANE_LOG_TAG, STREAM_DIRECTION_IN, &(state->stream), state,
paInt16, // sample format #TODO hardcoded
48000, // sample rate #TODO hardcoded
2, // channels #TODO hardcoded
latency, &_latency_ms, pa_buffer_size, endpoint_id, callback);

UNIFEX_TERM res =
error ? create_result_error(env, error) : create_result_ok(env, state);
Expand Down
27 changes: 21 additions & 6 deletions lib/membrane_element_portaudio/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ defmodule Membrane.Element.PortAudio.Sink do
Audio sink that plays sound via multi-platform PortAudio library.
"""

use Membrane.Sink

import Mockery.Macro

alias Membrane.Buffer
alias Membrane.Caps.Audio.Raw, as: Caps
alias Membrane.Element.PortAudio.SyncExecutor
alias Membrane.Time
alias __MODULE__.Native
import Mockery.Macro
use Membrane.Sink

@pa_no_device -1

def_clock """
This clock measures time by counting a number of samples consumed by a PortAudio device
and allows synchronization with it.
"""

# FIXME hardcoded caps
def_input_pad :input,
demand_unit: :bytes,
Expand Down Expand Up @@ -48,12 +56,13 @@ defmodule Membrane.Element.PortAudio.Sink do
options
|> Map.from_struct()
|> Map.merge(%{
native: nil
native: nil,
latency_time: 0
})}
end

@impl true
def handle_prepared_to_playing(_ctx, state) do
def handle_prepared_to_playing(ctx, state) do
%{
endpoint_id: endpoint_id,
ringbuffer_size: ringbuffer_size,
Expand All @@ -63,20 +72,26 @@ defmodule Membrane.Element.PortAudio.Sink do

endpoint_id = if endpoint_id == :default, do: @pa_no_device, else: endpoint_id

with {:ok, native} <-
with {:ok, {latency_ms, native}} <-
SyncExecutor.apply(Native, :create, [
self(),
ctx.clock,
endpoint_id,
ringbuffer_size,
pa_buffer_size,
latency
]) do
{:ok, %{state | native: native}}
{:ok, %{state | latency_time: latency_ms |> Time.milliseconds(), native: native}}
else
{:error, reason} -> {{:error, reason}, state}
end
end

@impl true
def handle_playing_to_prepared(_ctx, %{native: nil} = state) do
{:ok, state}
end

@impl true
def handle_playing_to_prepared(_ctx, %{native: native} = state) do
{SyncExecutor.apply(Native, :destroy, native), %{state | native: nil}}
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane_element_portaudio/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Membrane.Element.PortAudio.Source do
Audio source that captures sound via multi-platform PortAudio library.
"""

use Membrane.Source

alias Membrane.Buffer
alias Membrane.Caps.Audio.Raw, as: Caps
alias Membrane.Element.PortAudio.SyncExecutor
alias __MODULE__.Native
use Membrane.Source

@pa_no_device -1

Expand Down
9 changes: 6 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Membrane.Element.PortAudio.Mixfile do
use Mix.Project

@github_url "https://github.com/membraneframework/membrane-element-portaudio"
@version "0.2.4"
@version "0.2.5"

def project do
[
Expand All @@ -29,9 +29,9 @@ defmodule Membrane.Element.PortAudio.Mixfile do

defp deps do
[
{:membrane_core, "~> 0.4.0"},
{:membrane_core, "~> 0.4.1"},
{:membrane_common_c, "~> 0.2.4"},
{:bunch, "~> 1.0"},
{:bunch, "~> 1.2"},
{:unifex, "~> 0.2.0"},
{:membrane_caps_audio_raw, "~> 0.1.0"},
{:bundlex, "~> 0.2.0"},
Expand All @@ -44,6 +44,9 @@ defmodule Membrane.Element.PortAudio.Mixfile do
[
main: "readme",
extras: ["README.md"],
nest_modules_by_prefix: [
Membrane.Element.PortAudio
],
source_ref: "v#{@version}"
]
end
Expand Down
6 changes: 2 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
"bunch_native": {:hex, :bunch_native, "0.2.1", "0227d2a751a32f8c0b77dfec57c8dc7216351720c9c755c467e6d9387467fd1f", [:mix], [{:bundlex, "~> 0.2.7", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm"},
"bundlex": {:hex, :bundlex, "0.2.7", "8f46199bf4cf84a60cdfc142edeafbab37040167acc35dda8aa70433f2ff8162", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:secure_random, "~> 0.5", [hex: :secure_random, repo: "hexpm", optional: false]}], "hexpm"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []},
"earmark": {:hex, :earmark, "1.4.0", "397e750b879df18198afc66505ca87ecf6a96645545585899f6185178433cc09", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.4.1", "07bb382826ee8d08d575a1981f971ed41bd5d7e86b917fd012a93c51b5d28727", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"membrane_caps_audio_raw": {:hex, :membrane_caps_audio_raw, "0.1.8", "1b57890d14b95a3d5f01d03b4d99c9742134730c6e4580965b538aa704bd8c4e", [:mix], [{:bimap, "~> 1.0", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.4.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm"},
"membrane_common_c": {:hex, :membrane_common_c, "0.2.4", "38ef741e702d7826cc8498673c8964a1974bbb4ce1e423d39a34359459e213a1", [:mix], [{:bundlex, "~> 0.2.0", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.4.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.2.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 0.2.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm"},
"membrane_core": {:hex, :membrane_core, "0.4.0", "fef5331ff2f2a20a743d7718076032e041532e4a6d942b57223a445e0b9fce9e", [:mix], [{:bunch, "~> 1.2", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}], "hexpm"},
"membrane_core": {:hex, :membrane_core, "0.4.1", "603054b637c7514cef7454a5aadd224b088f9dabb037e11d968d315d4dea1260", [:mix], [{:bunch, "~> 1.2", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}], "hexpm"},
"mockery": {:hex, :mockery, "2.3.0", "f1af3976916e7402427116f491e3038a251857de8f0836952c2fa24ad6de4317", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
"numbers": {:hex, :numbers, "5.1.1", "1277dbee5dc73b0e1608bd6d318bd8338cbcba5b68cade65f24e4bb402676b5c", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"porcelain": {:hex, :porcelain, "2.0.3", "2d77b17d1f21fed875b8c5ecba72a01533db2013bd2e5e62c6d286c029150fdc", [:mix], [], "hexpm"},
"qex": {:hex, :qex, "0.5.0", "5a3a9becf67d4006377c4c247ffdaaa8ae5b3634a0caadb788dc24d6125068f4", [:mix], [], "hexpm"},
"ratio": {:hex, :ratio, "2.2.2", "c26013b9af7d03cf451bbe11608046e4c9d06e127f5e387668ae97163e75a6dd", [:mix], [{:numbers, "~> 5.1.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm"},
"secure_random": {:hex, :secure_random, "0.5.1", "c5532b37c89d175c328f5196a0c2a5680b15ebce3e654da37129a9fe40ebf51b", [:mix], [], "hexpm"},
Expand Down
Loading

0 comments on commit 7df4cbd

Please sign in to comment.