Skip to content

Commit bb2fde7

Browse files
authored
feat: refactor the code structure of start and run, allowing for parallel starting daemons (#56)
* feat: refactor the code structure of `start` and `run`, allowing for parallel starting daemons 1. Use tokio to run daemons in parallel, which avoids waiting 3 secs (by default) for EACH daemon. 2. Now that pitchfork will always exit with code 1, regardless of what exit code the daemons give. And we have already printed messages like `daemon xxx exited with code xxx`. 3. Move most of the message output to ipc client, to avoid duplicating them in `start` and `run` command. * fix: pass exit code on retrying correctly
1 parent 67e0445 commit bb2fde7

File tree

7 files changed

+153
-97
lines changed

7 files changed

+153
-97
lines changed

README.md

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,12 @@ run = "npm run server:api"
8888
run = "npm run server:docs"
8989
```
9090

91-
Start all daemons:
91+
Start all daemons or mutiple daemons **in parallel**:
9292

9393
```sh-session
9494
$ pitchfork start --all
95-
```
96-
97-
Or start individual ones:
9895

99-
```sh-session
100-
$ pitchfork start redis
96+
$ pitchfork start redis api
10197
```
10298

10399
### Shell hook (auto start/stop)
@@ -144,7 +140,7 @@ ready.http = { url = "http://localhost:5432" }
144140
[daemons.redis]
145141
run = "redis-server --port 6379"
146142
auto = ["start", "stop"]
147-
ready.delay = "2s"
143+
ready.delay = 2
148144

149145
[daemons.api]
150146
run = "npm run dev:api"

docs/getting-started.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ This project is experimental. It works in basic situations but you'll undoubtedl
1818
- Auto start daemons when entering a project directory - then auto stop when leaving
1919
- Restart daemons on failure
2020
- Cron jobs
21-
- 🚧 Global configuration
22-
- 🚧 Automatically start daemons on boot
21+
- Global configuration
22+
- Automatically start daemons on boot
2323

2424
## Workflows
2525

src/cli/run.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ pub struct Run {
2626

2727
impl Run {
2828
pub async fn run(&self) -> Result<()> {
29-
info!("Running one-off daemon");
3029
if self.run.is_empty() {
3130
bail!("No command provided");
3231
}
3332

34-
let start_time = chrono::Local::now();
3533
let ipc = IpcClient::connect(true).await?;
3634

37-
let (started, exit_code) = ipc
35+
let (_started, exit_code) = ipc
3836
.run(RunOptions {
3937
id: self.id.clone(),
4038
cmd: self.run.clone(),
@@ -52,20 +50,8 @@ impl Run {
5250
})
5351
.await?;
5452

55-
if !started.is_empty() {
56-
info!("started {}", started.join(", "));
57-
}
58-
59-
if let Some(code) = exit_code {
60-
error!("Daemon '{}' failed with exit code {}", self.id, code);
61-
62-
// Print logs from the time we started
63-
if let Err(e) = crate::cli::logs::print_logs_for_time_range(&self.id, start_time, None)
64-
{
65-
error!("Failed to print logs: {}", e);
66-
}
67-
68-
std::process::exit(code);
53+
if exit_code.is_some() {
54+
std::process::exit(1);
6955
}
7056
Ok(())
7157
}

src/cli/start.rs

Lines changed: 103 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::daemon::RunOptions;
22
use crate::ipc::client::IpcClient;
33
use crate::pitchfork_toml::PitchforkToml;
44
use crate::Result;
5-
use miette::{ensure, IntoDiagnostic};
6-
use std::collections::HashSet;
5+
use miette::ensure;
6+
use std::sync::Arc;
77

88
/// Starts a daemon from a pitchfork.toml file
99
#[derive(Debug, clap::Args)]
@@ -34,86 +34,131 @@ impl Start {
3434
"At least one daemon ID must be provided"
3535
);
3636
let pt = PitchforkToml::all_merged();
37-
let ipc = IpcClient::connect(true).await?;
37+
let ipc = Arc::new(IpcClient::connect(true).await?);
3838
let disabled_daemons = ipc.get_disabled_daemons().await?;
39-
let active_daemons: HashSet<String> = ipc
40-
.active_daemons()
41-
.await?
42-
.into_iter()
43-
.map(|d| d.id)
44-
.collect();
4539
let ids = if self.all {
4640
pt.daemons.keys().cloned().collect()
4741
} else {
4842
self.id.clone()
4943
};
50-
let mut any_failed = false;
51-
let mut last_exit_code = 0;
44+
// launch all tasks concurrently
45+
let mut tasks = Vec::new();
5246

53-
for id in &ids {
54-
if disabled_daemons.contains(id) {
47+
for id in ids {
48+
if disabled_daemons.contains(&id) {
5549
warn!("Daemon {} is disabled", id);
5650
continue;
5751
}
58-
if !self.force && active_daemons.contains(id) {
59-
warn!("Daemon {} is already running", id);
60-
continue;
61-
}
62-
let daemon = pt.daemons.get(id);
63-
if let Some(daemon) = daemon {
64-
info!("Starting daemon {}", id);
65-
let start_time = chrono::Local::now();
66-
let cmd = shell_words::split(&daemon.run).into_diagnostic()?;
67-
let (started, exit_code) = ipc
52+
53+
let daemon_data = match pt.daemons.get(&id) {
54+
Some(d) => {
55+
let run = d.run.clone();
56+
let auto_stop = d
57+
.auto
58+
.contains(&crate::pitchfork_toml::PitchforkTomlAuto::Stop);
59+
let dir = d
60+
.path
61+
.as_ref()
62+
.and_then(|p| p.parent())
63+
.map(|p| p.to_path_buf())
64+
.unwrap_or_default();
65+
let cron_schedule = d.cron.as_ref().map(|c| c.schedule.clone());
66+
let cron_retrigger = d.cron.as_ref().map(|c| c.retrigger);
67+
let retry = d.retry;
68+
let ready_delay = d.ready_delay;
69+
let ready_output = d.ready_output.clone();
70+
71+
(
72+
run,
73+
auto_stop,
74+
dir,
75+
cron_schedule,
76+
cron_retrigger,
77+
retry,
78+
ready_delay,
79+
ready_output,
80+
)
81+
}
82+
None => {
83+
warn!("Daemon {} not found", id);
84+
continue;
85+
}
86+
};
87+
88+
let (
89+
run,
90+
auto_stop,
91+
dir,
92+
cron_schedule,
93+
cron_retrigger,
94+
retry,
95+
ready_delay,
96+
ready_output,
97+
) = daemon_data;
98+
99+
let ipc_clone = ipc.clone();
100+
let shell_pid = self.shell_pid;
101+
let force = self.force;
102+
let delay = self.delay;
103+
let output = self.output.clone();
104+
105+
let task = tokio::spawn(async move {
106+
let cmd = match shell_words::split(&run) {
107+
Ok(c) => c,
108+
Err(e) => {
109+
error!("Failed to parse command for daemon {}: {}", id, e);
110+
return Some(1);
111+
}
112+
};
113+
114+
match ipc_clone
68115
.run(RunOptions {
69116
id: id.clone(),
70117
cmd,
71-
shell_pid: self.shell_pid,
72-
force: self.force,
73-
autostop: daemon
74-
.auto
75-
.contains(&crate::pitchfork_toml::PitchforkTomlAuto::Stop),
76-
dir: daemon
77-
.path
78-
.as_ref()
79-
.unwrap()
80-
.parent()
81-
.map(|p| p.to_path_buf())
82-
.unwrap_or_default(),
83-
cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
84-
cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
85-
retry: daemon.retry,
118+
shell_pid,
119+
force,
120+
autostop: auto_stop,
121+
dir,
122+
cron_schedule,
123+
cron_retrigger,
124+
retry,
86125
retry_count: 0,
87-
ready_delay: self.delay.or(daemon.ready_delay).or(Some(3)),
88-
ready_output: self.output.clone().or(daemon.ready_output.clone()),
126+
ready_delay: delay.or(ready_delay).or(Some(3)),
127+
ready_output: output.or(ready_output),
89128
wait_ready: true,
90129
})
91-
.await?;
92-
if !started.is_empty() {
93-
info!("started {}", started.join(", "));
130+
.await
131+
{
132+
Ok((_started, exit_code)) => exit_code,
133+
Err(e) => {
134+
error!("Failed to start daemon {}: {}", id, e);
135+
Some(1)
136+
}
94137
}
95-
if let Some(code) = exit_code {
96-
any_failed = true;
97-
last_exit_code = code;
98-
error!("daemon {} failed with exit code {}", id, code);
138+
});
139+
140+
tasks.push(task);
141+
}
99142

100-
// Print logs from the time we started this specific daemon
101-
if let Err(e) =
102-
crate::cli::logs::print_logs_for_time_range(id, start_time, None)
103-
{
104-
error!("Failed to print logs: {}", e);
143+
// wait for all tasks to complete
144+
let mut any_failed = false;
145+
146+
for task in tasks {
147+
match task.await {
148+
Ok(exit_code) => {
149+
if exit_code.is_some() {
150+
any_failed = true;
105151
}
106152
}
107-
} else {
108-
warn!("Daemon {} not found", id);
153+
Err(e) => {
154+
error!("Task panicked: {}", e);
155+
any_failed = true;
156+
}
109157
}
110158
}
111159

112160
if any_failed {
113-
if last_exit_code != 0 {
114-
error!("Process exited with code {}", last_exit_code);
115-
}
116-
std::process::exit(last_exit_code);
161+
std::process::exit(1);
117162
}
118163
Ok(())
119164
}

src/ipc/client.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,25 +135,45 @@ impl IpcClient {
135135
}
136136

137137
pub async fn run(&self, opts: RunOptions) -> Result<(Vec<String>, Option<i32>)> {
138-
debug!("starting daemon {}", opts.id);
138+
info!("starting daemon {}", opts.id);
139+
let start_time = chrono::Local::now();
139140
let rsp = self.request(IpcRequest::Run(opts.clone())).await?;
140141
let mut started_daemons = vec![];
141142
let mut exit_code = None;
142143
match rsp {
143144
IpcResponse::DaemonStart { daemon } => {
144-
started_daemons.push(daemon.id);
145+
started_daemons.push(daemon.id.clone());
146+
info!("started {}", daemon.id);
145147
}
146148
IpcResponse::DaemonReady { daemon } => {
147-
started_daemons.push(daemon.id);
149+
started_daemons.push(daemon.id.clone());
150+
info!("started {}", daemon.id);
148151
}
149152
IpcResponse::DaemonFailedWithCode { exit_code: code } => {
150-
exit_code = Some(code.unwrap_or(1));
153+
let code = code.unwrap_or(1);
154+
exit_code = Some(code);
155+
error!("daemon {} failed with exit code {}", opts.id, code);
156+
157+
// Print logs from the time we started this specific daemon
158+
if let Err(e) =
159+
crate::cli::logs::print_logs_for_time_range(&opts.id, start_time, None)
160+
{
161+
error!("Failed to print logs: {}", e);
162+
}
151163
}
152164
IpcResponse::DaemonAlreadyRunning => {
153165
warn!("daemon {} already running", opts.id);
154166
}
155167
IpcResponse::DaemonFailed { error } => {
156-
bail!("failed to start daemon {}: {error}", opts.id);
168+
error!("Failed to start daemon {}: {}", opts.id, error);
169+
exit_code = Some(1);
170+
171+
// Print logs from the time we started this specific daemon
172+
if let Err(e) =
173+
crate::cli::logs::print_logs_for_time_range(&opts.id, start_time, None)
174+
{
175+
error!("Failed to print logs: {}", e);
176+
}
157177
}
158178
rsp => unreachable!("unexpected response: {rsp:?}"),
159179
}

src/supervisor.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,12 +486,25 @@ impl Supervisor {
486486
debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
487487
if !ready_notified {
488488
if let Some(tx) = ready_tx.take() {
489-
let exit_code = exit_status.as_ref().and_then(|r| r.as_ref().ok().and_then(|s| s.code()));
490-
debug!("daemon {id} not ready yet, sending failure notification with exit_code: {:?}", exit_code);
491-
let _ = tx.send(Err(exit_code));
489+
// Check if process exited successfully
490+
let is_success = exit_status.as_ref()
491+
.and_then(|r| r.as_ref().ok())
492+
.map(|s| s.success())
493+
.unwrap_or(false);
494+
495+
if is_success {
496+
debug!("daemon {id} exited successfully before ready check, sending success notification");
497+
let _ = tx.send(Ok(()));
498+
} else {
499+
let exit_code = exit_status.as_ref()
500+
.and_then(|r| r.as_ref().ok())
501+
.and_then(|s| s.code());
502+
debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
503+
let _ = tx.send(Err(exit_code));
504+
}
492505
}
493506
} else {
494-
debug!("daemon {id} was already marked ready, not sending failure notification");
507+
debug!("daemon {id} was already marked ready, not sending notification");
495508
}
496509
break;
497510
}

tests/test_e2e.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -624,10 +624,6 @@ ready_output = "NEVER_APPEARS"
624624

625625
// When ready_output is set but never matches, it blocks until daemon exits
626626
// slowly_output.ts outputs 5 times with 1s interval, so runs for ~5 seconds
627-
assert!(
628-
output.status.success(),
629-
"Start should succeed when daemon exits cleanly even if pattern never matches"
630-
);
631627
assert!(
632628
elapsed >= Duration::from_secs(3),
633629
"Should block until daemon exits (~3s), took {:?}",

0 commit comments

Comments
 (0)