Skip to content

Commit 3d28c9f

Browse files
committed
fix(backend/runner): Processes are stoppable, thanks God, thank you
1 parent 3e51c32 commit 3d28c9f

File tree

5 files changed

+188
-29
lines changed

5 files changed

+188
-29
lines changed

Cargo.lock

Lines changed: 100 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/runner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ authors = ["Apika Luca"]
99
test = true
1010

1111
[dependencies]
12+
async-io = "2.4.1"
1213
hakoniwa = "1.1.0"
1314
lsp-types = "0.97.0"
1415
nix = "0.29.0"

backend/runner/src/hakoniwa_ext.rs

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1+
use std::{io::Read, ops, os::fd::AsFd, ptr::read};
2+
3+
use async_io::Async;
14
use hakoniwa::{Child, ExitStatus};
2-
use nix::sys::wait::{self, WaitPidFlag, WaitStatus};
5+
use nix::{
6+
sys::wait::{self, WaitPidFlag, WaitStatus},
7+
unistd::Pid,
8+
};
39

4-
trait HakoniwaChildExt {
10+
pub trait HakoniwaChildExt {
511
fn try_wait(&self) -> Option<ExitStatus>;
612
}
713

814
impl HakoniwaChildExt for Child {
915
fn try_wait(&self) -> Option<ExitStatus> {
10-
match wait::waitpid(self.id(), Some(WaitPidFlag::WNOHANG)) {
16+
match wait::waitpid(Pid::from_raw(self.id() as i32), Some(WaitPidFlag::WNOHANG)) {
1117
Ok(WaitStatus::StillAlive) => None,
1218
Ok(WaitStatus::Exited(_, code)) => Some(ExitStatus {
1319
code,
@@ -16,13 +22,65 @@ impl HakoniwaChildExt for Child {
1622
exit_code: None,
1723
rusage: None,
1824
}),
19-
Ok(WaitStatus::Signaled(_, signal, _) | WaitStatus::Stopped(_, signal)) => Some(ExitStatus {
20-
code: signal as i32,
21-
reason: signal.as_str().to_owned(),
22-
exit_code: None,
23-
rusage: None,
24-
}),
25+
Ok(WaitStatus::Signaled(_, signal, _) | WaitStatus::Stopped(_, signal)) => {
26+
Some(ExitStatus {
27+
code: signal as i32,
28+
reason: signal.as_str().to_owned(),
29+
exit_code: None,
30+
rusage: None,
31+
})
32+
}
2533
Ok(WaitStatus::Continued(_)) => None,
34+
Ok(_) => None,
35+
Err(err) => {
36+
println!("[ERROR] {err}");
37+
None
38+
}
2639
}
2740
}
2841
}
42+
43+
pub struct AsyncOsReader(Async<os_pipe::PipeReader>);
44+
45+
impl From<os_pipe::PipeReader> for AsyncOsReader {
46+
fn from(value: os_pipe::PipeReader) -> Self {
47+
Self(Async::new(value).expect("Cannot create async wrapper"))
48+
}
49+
}
50+
51+
impl AsFd for AsyncOsReader {
52+
fn as_fd(&self) -> std::os::unix::prelude::BorrowedFd<'_> {
53+
self.0.as_fd()
54+
}
55+
}
56+
57+
impl AsyncOsReader {
58+
pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
59+
_ = self.0.readable().await?;
60+
unsafe { self.0.get_mut() }.read(buf)
61+
}
62+
63+
pub async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
64+
let mut total_bytes = 0;
65+
66+
loop {
67+
let read_bytes = self.read(buf.as_mut_slice()).await?;
68+
69+
if read_bytes == 0 {
70+
break;
71+
}
72+
73+
total_bytes += read_bytes;
74+
}
75+
76+
Ok(total_bytes)
77+
}
78+
}
79+
80+
impl ops::Deref for AsyncOsReader {
81+
type Target = os_pipe::PipeReader;
82+
83+
fn deref(&self) -> &Self::Target {
84+
&self.0.get_ref()
85+
}
86+
}

backend/runner/src/lib.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
pub mod error;
22
pub mod hakoniwa_ext;
33

4+
use async_io::Async;
45
use hakoniwa::{Child, Command, Container, ExitStatus, Output};
6+
use hakoniwa_ext::{AsyncOsReader, HakoniwaChildExt};
57
use nix::libc::pid_t;
68
use nix::sys::signal::{self, Signal};
79
use nix::unistd::Pid;
810
pub use os_pipe::{PipeReader, PipeWriter};
911
use std::future::Future;
1012
use std::io::Read;
1113
use std::path::{Path, PathBuf};
14+
use std::time::Duration;
1215
use tokio::sync::oneshot;
16+
use tokio::time::Interval;
1317
use tokio::{fs, io};
1418

1519
pub const BASE_ENV: [(&str, &str); 3] = [
@@ -92,12 +96,12 @@ impl Runner {
9296
}
9397

9498
pub async fn collect_output(cmd: &mut Command) -> Result<Output, hakoniwa::Error> {
95-
async fn collect(stream: Option<PipeReader>) -> Vec<u8> {
99+
async fn collect(stream: Option<AsyncOsReader>) -> Vec<u8> {
96100
let mut buf = Vec::new();
97101

98102
let Some(mut stream) = stream else { return buf };
99103

100-
_ = stream.read_to_end(&mut buf);
104+
_ = stream.read_to_end(&mut buf).await;
101105

102106
buf
103107
}
@@ -120,10 +124,10 @@ impl Runner {
120124
where
121125
Stdout: Default + Send + 'static,
122126
StdoutAsync: Future<Output = Stdout> + Send + 'static,
123-
StdoutFn: FnOnce(Option<PipeReader>) -> StdoutAsync,
127+
StdoutFn: FnOnce(Option<AsyncOsReader>) -> StdoutAsync,
124128
Stderr: Default + Send + 'static,
125129
StderrAsync: Future<Output = Stderr> + Send + 'static,
126-
StderrFn: FnOnce(Option<PipeReader>) -> StderrAsync,
130+
StderrFn: FnOnce(Option<AsyncOsReader>) -> StderrAsync,
127131
{
128132
let mut child = cmd
129133
.envs(BASE_ENV)
@@ -133,18 +137,24 @@ impl Runner {
133137
.stderr(hakoniwa::Stdio::MakePipe)
134138
.spawn()?;
135139

136-
let stdout = child.stdout.take();
140+
let stdout = child.stdout.take().map(AsyncOsReader::from);
137141
let stdout = tokio::spawn(stdout_fn(stdout));
138-
let stderr = child.stderr.take();
142+
let stderr = child.stderr.take().map(AsyncOsReader::from);
139143
let stderr = tokio::spawn(stderr_fn(stderr));
140144

141145
let child_pid = Pid::from_raw(child.id() as pid_t);
142-
let status = if let Some(abort) = abort {
146+
let status = if let Some(mut abort) = abort {
143147
tokio::spawn(async move {
148+
let mut status_check_interval = tokio::time::interval(Duration::from_millis(100));
149+
144150
loop {
145151
tokio::select! {
146-
_ = abort => {
147-
println!("[[[[KILL]]]]");
152+
_ = status_check_interval.tick() => {
153+
if let Some(status) = child.try_wait() {
154+
return Ok(status)
155+
}
156+
}
157+
_ = &mut abort => {
148158
_ = signal::kill(child_pid, Signal::SIGKILL);
149159
return Ok(ExitStatus {
150160
code: 137,
@@ -153,14 +163,6 @@ impl Runner {
153163
rusage: None,
154164
});
155165
},
156-
else => {
157-
return Ok(ExitStatus {
158-
code: 137,
159-
reason: "Aborted".to_owned(),
160-
exit_code: None,
161-
rusage: None,
162-
});
163-
}
164166
}
165167
}
166168
})

backend/src/project/project_runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn execute(project: ProjectExecuter) -> Result<(), ()> {
8383
let buf = &mut [0; 2048];
8484

8585
loop {
86-
let Ok(size) = stdout.read(buf) else {
86+
let Ok(size) = stdout.read(buf).await else {
8787
log::trace!("Cannot read");
8888
break;
8989
};

0 commit comments

Comments
 (0)