@@ -34,6 +34,7 @@ type AResult<T> = std::result::Result<T, alsa::Error>;
34
34
#[ derive( Clone , Debug ) ]
35
35
pub struct AlsaBackend {
36
36
sender : Arc < Mutex < Sender < AlsaAction > > > ,
37
+ streams : Arc < RwLock < Vec < Stream > > > ,
37
38
}
38
39
39
40
#[ derive( Debug ) ]
@@ -42,7 +43,6 @@ enum AlsaAction {
42
43
Prepare ( usize ) ,
43
44
Release ( usize , ControlMessage ) ,
44
45
Start ( usize ) ,
45
- Stop ( usize ) ,
46
46
Write ( usize ) ,
47
47
Read ( usize ) ,
48
48
}
@@ -145,10 +145,21 @@ fn write_samples_direct(
145
145
let Some ( buffer) = stream. buffers . front_mut ( ) else {
146
146
return Ok ( false ) ;
147
147
} ;
148
+ if !matches ! ( stream. state, PCMState :: Start ) {
149
+ return Ok ( false ) ;
150
+ }
148
151
let mut buf = vec ! [ 0 ; buffer. data_descriptor. len( ) as usize ] ;
149
- let read_bytes = buffer
150
- . consume ( & mut buf)
151
- . expect ( "failed to read buffer from guest" ) ;
152
+ let read_bytes = match buffer. consume ( & mut buf) {
153
+ Err ( err) => {
154
+ log:: error!(
155
+ "Could not read TX buffer from guest, dropping it immediately: {}" ,
156
+ err
157
+ ) ;
158
+ stream. buffers . pop_front ( ) ;
159
+ continue ;
160
+ }
161
+ Ok ( v) => v,
162
+ } ;
152
163
let mut iter = buf[ 0 ..read_bytes as usize ] . iter ( ) . cloned ( ) ;
153
164
let frames = mmap. write ( & mut iter) ;
154
165
let written_bytes = pcm. frames_to_bytes ( frames) ;
@@ -160,48 +171,56 @@ fn write_samples_direct(
160
171
}
161
172
}
162
173
match mmap. status ( ) . state ( ) {
163
- State :: Running => {
164
- return Ok ( false ) ;
165
- }
166
- State :: Prepared => { }
167
- State :: XRun => {
168
- log:: trace!( "Underrun in audio output stream!" ) ;
169
- pcm. prepare ( ) ?
170
- }
171
- State :: Suspended => { }
174
+ State :: Suspended | State :: Running | State :: Prepared => Ok ( false ) ,
175
+ State :: XRun => Ok ( true ) , // Recover from this in next round
172
176
n => panic ! ( "Unexpected pcm state {:?}" , n) ,
173
177
}
174
- Ok ( true )
175
178
}
176
179
177
180
fn write_samples_io (
178
181
p : & alsa:: PCM ,
179
- stream : & mut Stream ,
182
+ streams : & Arc < RwLock < Vec < Stream > > > ,
183
+ stream_id : usize ,
180
184
io : & mut alsa:: pcm:: IO < u8 > ,
181
185
) -> AResult < bool > {
182
- loop {
183
- let avail = match p. avail_update ( ) {
184
- Ok ( n) => n,
185
- Err ( err) => {
186
- log:: trace!( "Recovering from {}" , err) ;
187
- p. recover ( err. errno ( ) as std:: os:: raw:: c_int , true ) ?;
188
- p. avail_update ( ) ?
186
+ let avail = match p. avail_update ( ) {
187
+ Ok ( n) => n,
188
+ Err ( err) => {
189
+ log:: trace!( "Recovering from {}" , err) ;
190
+ p. recover ( err. errno ( ) as std:: os:: raw:: c_int , true ) ?;
191
+ if let Err ( err) = p. start ( ) {
192
+ log:: error!(
193
+ "Could not restart stream {}; ALSA returned: {}" ,
194
+ stream_id,
195
+ err
196
+ ) ;
197
+ return Err ( err) ;
189
198
}
190
- } ;
191
- if avail == 0 {
192
- break ;
199
+ p. avail_update ( ) ?
193
200
}
194
- let written = io. mmap ( avail as usize , |buf| {
201
+ } ;
202
+ if avail != 0 {
203
+ io. mmap ( avail as usize , |buf| {
204
+ let stream = & mut streams. write ( ) . unwrap ( ) [ stream_id] ;
195
205
let Some ( buffer) = stream. buffers . front_mut ( ) else {
196
206
return 0 ;
197
207
} ;
208
+ if !matches ! ( stream. state, PCMState :: Start ) {
209
+ stream. buffers . pop_front ( ) ;
210
+ return 0 ;
211
+ }
198
212
let mut data = vec ! [ 0 ; buffer. data_descriptor. len( ) as usize ] ;
199
-
200
213
// consume() always reads (buffer.data_descriptor.len() -
201
214
// buffer.pos) bytes
202
- let read_bytes = buffer
203
- . consume ( & mut data)
204
- . expect ( "failed to read buffer from guest" ) ;
215
+ let read_bytes = match buffer. consume ( & mut data) {
216
+ Ok ( v) => v,
217
+ Err ( err) => {
218
+ log:: error!( "Could not read TX buffer, dropping it immediately: {}" , err) ;
219
+ stream. buffers . pop_front ( ) ;
220
+ return 0 ;
221
+ }
222
+ } ;
223
+
205
224
let mut iter = data[ 0 ..read_bytes as usize ] . iter ( ) . cloned ( ) ;
206
225
207
226
let mut written_bytes = 0 ;
@@ -217,14 +236,12 @@ fn write_samples_io(
217
236
. try_into ( )
218
237
. unwrap_or_default ( )
219
238
} ) ?;
220
- if written == 0 {
221
- break ;
222
- } ;
239
+ } else {
240
+ return Ok ( false ) ;
223
241
}
224
242
225
243
match p. state ( ) {
226
- State :: Suspended | State :: Running => Ok ( false ) ,
227
- State :: Prepared => Ok ( false ) ,
244
+ State :: Suspended | State :: Running | State :: Prepared => Ok ( false ) ,
228
245
State :: XRun => Ok ( true ) , // Recover from this in next round
229
246
n => panic ! ( "Unexpected pcm state {:?}" , n) ,
230
247
}
@@ -237,47 +254,44 @@ fn alsa_worker(
237
254
stream_id : usize ,
238
255
) -> AResult < ( ) > {
239
256
loop {
257
+ // We get a `true` every time a new I/O message is received from the guest.
258
+ // If the recv() returns `Ok(false)` or an error, terminate this worker thread.
240
259
let Ok ( do_write) = receiver. recv ( ) else {
241
260
return Ok ( ( ) ) ;
242
261
} ;
243
262
if do_write {
244
- loop {
245
- if matches ! ( receiver. try_recv( ) , Ok ( false ) ) {
246
- break ;
247
- }
248
-
263
+ let has_buffers = || -> bool {
264
+ // Hold `streams` lock as short as possible.
265
+ let lck = streams. read ( ) . unwrap ( ) ;
266
+ !lck[ stream_id] . buffers . is_empty ( )
267
+ && matches ! ( lck[ stream_id] . state, PCMState :: Start )
268
+ } ;
269
+ // Run this loop till the stream's buffer vector is empty:
270
+ ' empty_buffers: while has_buffers ( ) {
271
+ // When we return from a write attempt and there is still space in the
272
+ // stream's buffers, get the ALSA file descriptors and poll them till the host
273
+ // sound device tells us there is more available data.
249
274
let mut fds = {
250
275
let lck = pcm. lock ( ) . unwrap ( ) ;
251
- if matches ! ( lck. state( ) , State :: Running | State :: Prepared | State :: XRun ) {
252
- let mut mmap = lck. direct_mmap_playback :: < u8 > ( ) . ok ( ) ;
276
+ let mut mmap = lck. direct_mmap_playback :: < u8 > ( ) . ok ( ) ;
253
277
254
- if let Some ( ref mut mmap) = mmap {
255
- if write_samples_direct (
256
- & lck,
257
- & mut streams. write ( ) . unwrap ( ) [ stream_id] ,
258
- mmap,
259
- ) ? {
260
- continue ;
261
- }
262
- } else {
263
- let mut io = lck. io_bytes ( ) ;
264
- // Direct mode unavailable, use alsa-lib's mmap emulation instead
265
- if write_samples_io (
266
- & lck,
267
- & mut streams. write ( ) . unwrap ( ) [ stream_id] ,
268
- & mut io,
269
- ) ? {
270
- continue ;
271
- }
278
+ if let Some ( ref mut mmap) = mmap {
279
+ if write_samples_direct (
280
+ & lck,
281
+ & mut streams. write ( ) . unwrap ( ) [ stream_id] ,
282
+ mmap,
283
+ ) ? {
284
+ continue ' empty_buffers;
272
285
}
273
- lck. get ( ) ?
274
286
} else {
275
- drop ( lck) ;
276
- sleep ( Duration :: from_millis ( 500 ) ) ;
277
- continue ;
287
+ let mut io = lck. io_bytes ( ) ;
288
+ // Direct mode unavailable, use alsa-lib's mmap emulation instead
289
+ if write_samples_io ( & lck, & streams, stream_id, & mut io) ? {
290
+ continue ' empty_buffers;
291
+ }
278
292
}
293
+ lck. get ( ) ?
279
294
} ;
280
- // Nothing to do, sleep until woken up by the kernel.
281
295
alsa:: poll:: poll ( & mut fds, 100 ) ?;
282
296
}
283
297
}
@@ -288,14 +302,15 @@ impl AlsaBackend {
288
302
pub fn new ( streams : Arc < RwLock < Vec < Stream > > > ) -> Self {
289
303
let ( sender, receiver) = channel ( ) ;
290
304
let sender = Arc :: new ( Mutex :: new ( sender) ) ;
305
+ let streams2 = Arc :: clone ( & streams) ;
291
306
292
307
thread:: spawn ( move || {
293
- if let Err ( err) = Self :: run ( streams , receiver) {
308
+ if let Err ( err) = Self :: run ( streams2 , receiver) {
294
309
log:: error!( "Main thread exited with error: {}" , err) ;
295
310
}
296
311
} ) ;
297
312
298
- Self { sender }
313
+ Self { sender, streams }
299
314
}
300
315
301
316
fn run (
@@ -380,21 +395,7 @@ impl AlsaBackend {
380
395
) ;
381
396
}
382
397
}
383
- }
384
- AlsaAction :: Stop ( stream_id) => {
385
- if stream_id >= streams_no {
386
- log:: error!(
387
- "Received Stop action for stream id {} but there are only {} PCM \
388
- streams.",
389
- stream_id,
390
- pcms. len( )
391
- ) ;
392
- continue ;
393
- } ;
394
- let stop_result = streams. write ( ) . unwrap ( ) [ stream_id] . state . stop ( ) ;
395
- if let Err ( err) = stop_result {
396
- log:: error!( "Stream {} stop {}" , stream_id, err) ;
397
- }
398
+ senders[ stream_id] . send ( true ) . unwrap ( ) ;
398
399
}
399
400
AlsaAction :: Prepare ( stream_id) => {
400
401
if stream_id >= streams_no {
@@ -529,8 +530,21 @@ impl AudioBackend for AlsaBackend {
529
530
read Read ,
530
531
prepare Prepare ,
531
532
start Start ,
532
- stop Stop ,
533
533
}
534
+
535
+ fn stop ( & self , id : u32 ) -> CrateResult < ( ) > {
536
+ if let Some ( Err ( err) ) = self
537
+ . streams
538
+ . write ( )
539
+ . unwrap ( )
540
+ . get_mut ( id as usize )
541
+ . map ( |s| s. state . stop ( ) )
542
+ {
543
+ log:: error!( "Stream {} stop {}" , id, err) ;
544
+ }
545
+ Ok ( ( ) )
546
+ }
547
+
534
548
send_action ! {
535
549
ctrl set_parameters SetParameters ,
536
550
ctrl release Release ,
0 commit comments