Skip to content
This repository was archived by the owner on Mar 25, 2023. It is now read-only.

Commit 9a76972

Browse files
authored
fpm-controller-api (#215)
* FPM Controller integration
1 parent 78e45a3 commit 9a76972

File tree

9 files changed

+287
-14
lines changed

9 files changed

+287
-14
lines changed

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ repository = "https://github.com/FifthTry/fpm"
1313
homepage = "https://fpm.dev"
1414
build = "build.rs"
1515

16+
[features]
17+
controller = []
18+
1619
[dependencies]
1720
async-recursion = "0.3.2"
1821
camino = "1.0.5"

src/commands/serve.rs

+13
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,19 @@ async fn serve_static(req: actix_web::HttpRequest) -> actix_web::HttpResponse {
145145

146146
#[actix_web::main]
147147
pub async fn serve(bind_address: &str, port: &str) -> std::io::Result<()> {
148+
if cfg!(feature = "controller") {
149+
// fpm-controller base path and ec2 instance id (hardcoded for now)
150+
let fpm_controller: String = std::env::var("FPM_CONTROLLER")
151+
.unwrap_or_else(|_| "https://controller.fifthtry.com".to_string());
152+
let fpm_instance: String =
153+
std::env::var("FPM_INSTANCE_ID").expect("FPM_INSTANCE_ID is required");
154+
155+
match crate::controller::resolve_dependencies(fpm_instance, fpm_controller).await {
156+
Ok(_) => println!("Dependencies resolved"),
157+
Err(e) => panic!("Error resolving dependencies using controller!!: {:?}", e),
158+
}
159+
}
160+
148161
println!("### Server Started ###");
149162
println!("Go to: http://{}:{}", bind_address, port);
150163
actix_web::HttpServer::new(|| {

src/config.rs

+5
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,11 @@ impl Package {
705705
}
706706
}
707707

708+
pub fn with_zip(mut self, zip: String) -> fpm::Package {
709+
self.zip = Some(zip);
710+
self
711+
}
712+
708713
pub fn get_dependency_for_interface(&self, interface: &str) -> Option<&fpm::Dependency> {
709714
self.dependencies
710715
.iter()

src/controller.rs

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/// FPM Controller Support
2+
/// FPM cli supports communication with fpm controller. This is an optional feature, and is only
3+
/// available when controller feature is enabled, which is not enabled by default.
4+
/// Controller Communication
5+
/// When controller feature is enabled, fpm serve will first communicate with the FPM controller
6+
/// service’s /get-package/ API.
7+
8+
/// FPM Controller Service Endpoint
9+
/// The FPM Controller Service’s endpoint is computed by using environment variable FPM_CONTROLLER,
10+
/// which will look something like this: https://controller.fifthtry.com, with the API path.
11+
/// FPM Controller Service has more than one APIs: /get-package/ and /fpm-ready/.
12+
13+
/// get-package:
14+
/// Through an environment variable FPM_INSTANCE_ID, the fpm serve will learn it’s instance id, and
15+
/// it will pass the instance id to the get-package API.
16+
/// The API returns the URL of the package to be downloaded, git repository URL and the package name.
17+
/// FPM will clone the git repository in the current directory. The current directory will contain
18+
/// FPM.ftd and other files of the package.
19+
/// FPM will then calls fpm install on it.
20+
21+
/// fpm-ready:
22+
/// Once dependencies are ready fpm calls /fpm-ready/ API on the controller. We will pass the
23+
/// FPM_INSTANCE_ID and the git commit hash as input to the API
24+
/// The API will return with success, and once it is done fpm will start receiving HTTP traffic
25+
/// from the controller service.
26+
27+
#[derive(serde::Deserialize, Debug)]
28+
struct ApiResponse<T> {
29+
success: bool,
30+
result: Option<T>,
31+
message: Option<String>,
32+
}
33+
34+
#[derive(serde::Deserialize, Debug)]
35+
struct PackageResult {
36+
package: String,
37+
git: String,
38+
}
39+
40+
pub async fn resolve_dependencies(fpm_instance: String, fpm_controller: String) -> fpm::Result<()> {
41+
// First call get_package API to get package details and resolve dependencies
42+
43+
// response from get-package API
44+
let package_response = get_package(fpm_instance.as_str(), fpm_controller.as_str()).await?;
45+
46+
// Clone the git package into the current directory
47+
// Need to execute shell commands from rust
48+
// git_url https format: https://github.com/<user>/<repo>.git
49+
50+
let package =
51+
fpm::Package::new(package_response.package.as_str()).with_zip(package_response.git);
52+
53+
package.unzip_package().await?;
54+
fpm::Config::read(None).await?;
55+
56+
/*let out = std::process::Command::new("git")
57+
.arg("clone")
58+
.arg(git_url)
59+
.output()
60+
.expect("unable to execute git clone command");
61+
62+
if out.status.success() {
63+
// By this time the cloned repo should be available in the current directory
64+
println!("Git cloning successful for the package {}", package_name);
65+
// Resolve dependencies by reading the FPM.ftd using config.read()
66+
// Assuming package_name and repo name are identical
67+
let _config = fpm::Config::read(Some(package_name.to_string())).await?;
68+
}*/
69+
70+
// Once the dependencies are resolved for the package
71+
// then call fpm_ready API to ensure that the controller service is now ready
72+
73+
// response from fpm_ready API
74+
75+
fpm_ready(fpm_instance.as_str(), fpm_controller.as_str()).await?;
76+
77+
Ok(())
78+
}
79+
80+
/// get-package API
81+
/// input: fpm_instance
82+
/// output: package_name and git repo URL
83+
/// format: {
84+
/// "success": true,
85+
/// "result": {
86+
/// "package": "<package name>"
87+
/// "git": "<git url>"
88+
/// }
89+
/// }
90+
async fn get_package(fpm_instance: &str, fpm_controller: &str) -> fpm::Result<PackageResult> {
91+
let controller_api = format!(
92+
"{}/v1/fpm/get-package?ec2_reservation={}",
93+
fpm_controller, fpm_instance
94+
);
95+
96+
let url = url::Url::parse(controller_api.as_str())?;
97+
98+
let mut headers = reqwest::header::HeaderMap::new();
99+
headers.insert(
100+
reqwest::header::USER_AGENT,
101+
reqwest::header::HeaderValue::from_static("fpm"),
102+
);
103+
104+
let resp: ApiResponse<PackageResult> = fpm::library::http::get_with_type(url, headers).await?;
105+
106+
if !resp.success {
107+
return Err(fpm::Error::APIResponseError(format!(
108+
"get_package api error: {:?}",
109+
resp.message
110+
)));
111+
}
112+
113+
resp.result.ok_or({
114+
fpm::Error::APIResponseError(format!("get_package api error: {:?}", &resp.message))
115+
})
116+
}
117+
118+
/// fpm-ready API
119+
/// input: fpm_instance, *(git commit hash)
120+
/// output: success: true/false
121+
/// format: lang: json
122+
/// {
123+
/// "success": true
124+
/// }
125+
126+
/// Git commit hash needs to be computed before making a call to the fpm_ready API
127+
async fn fpm_ready(fpm_instance: &str, fpm_controller: &str) -> fpm::Result<()> {
128+
let git_commit = "<dummy-git-commit-hash-xxx123>";
129+
130+
let controller_api = format!(
131+
"{}/v1/fpm/fpm-ready?ec2_reservation={}&hash={}",
132+
fpm_controller, fpm_instance, git_commit
133+
);
134+
135+
let url = url::Url::parse(controller_api.as_str())?;
136+
137+
// This request should be put request for fpm_ready API to update the instance status to ready
138+
// Using http::_get() function to make request to this API for now
139+
let mut headers = reqwest::header::HeaderMap::new();
140+
headers.insert(
141+
reqwest::header::USER_AGENT,
142+
reqwest::header::HeaderValue::from_static("fpm"),
143+
);
144+
145+
// TODO: here Map is wrong,
146+
let resp: ApiResponse<std::collections::HashMap<String, String>> =
147+
fpm::library::http::get_with_type(url, headers).await?;
148+
if !resp.success {
149+
return Err(fpm::Error::APIResponseError(format!(
150+
"fpm_ready api error: {:?}",
151+
resp.message
152+
)));
153+
}
154+
Ok(())
155+
}

src/dependency.rs

+68
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,74 @@ impl fpm::Package {
311311
}
312312
}
313313

314+
pub(crate) async fn unzip_package(&self) -> fpm::Result<()> {
315+
use std::convert::TryInto;
316+
use std::io::Write;
317+
318+
let download_url = if let Some(ref url) = self.zip {
319+
url
320+
} else {
321+
return Ok(());
322+
};
323+
324+
let path = std::env::temp_dir().join(format!("{}.zip", self.name.replace("/", "__")));
325+
326+
let start = std::time::Instant::now();
327+
print!("Downloading {} ... ", self.name.as_str());
328+
std::io::stdout().flush()?;
329+
// Download the zip folder
330+
{
331+
let mut response = if download_url[1..].contains("://")
332+
|| download_url.starts_with("//")
333+
{
334+
reqwest::get(download_url.as_str())?
335+
} else if let Ok(response) = reqwest::get(format!("https://{}", download_url).as_str())
336+
{
337+
response
338+
} else {
339+
reqwest::get(format!("http://{}", download_url).as_str())?
340+
};
341+
let mut file = std::fs::File::create(&path)?;
342+
// TODO: instead of reading the whole thing in memory use tokio::io::copy() somehow?
343+
let mut buf: Vec<u8> = vec![];
344+
response.copy_to(&mut buf)?;
345+
file.write_all(&buf)?;
346+
// file.write_all(response.text().await?.as_bytes())?;
347+
}
348+
349+
let file = std::fs::File::open(&path)?;
350+
// TODO: switch to async_zip crate
351+
let mut archive = zip::ZipArchive::new(file)?;
352+
for i in 0..archive.len() {
353+
let mut c_file = archive.by_index(i).unwrap();
354+
let out_path = match c_file.enclosed_name() {
355+
Some(path) => path.to_owned(),
356+
None => continue,
357+
};
358+
let out_path_without_folder = out_path.to_str().unwrap().split_once("/").unwrap().1;
359+
let file_extract_path = {
360+
let mut file_extract_path: camino::Utf8PathBuf =
361+
std::env::current_dir()?.canonicalize()?.try_into()?;
362+
file_extract_path = file_extract_path.join(out_path_without_folder);
363+
file_extract_path
364+
};
365+
if (&*c_file.name()).ends_with('/') {
366+
std::fs::create_dir_all(&file_extract_path)?;
367+
} else {
368+
if let Some(p) = file_extract_path.parent() {
369+
if !p.exists() {
370+
std::fs::create_dir_all(p)?;
371+
}
372+
}
373+
// Note: we will be able to use tokio::io::copy() with async_zip
374+
let mut outfile = std::fs::File::create(file_extract_path)?;
375+
std::io::copy(&mut c_file, &mut outfile)?;
376+
}
377+
}
378+
fpm::utils::print_end(format!("Downloaded {}", self.name.as_str()).as_str(), start);
379+
Ok(())
380+
}
381+
314382
/// This function is called by `process()` or recursively called by itself.
315383
/// It checks the `FPM.ftd` file of dependent package and find out all the dependency packages.
316384
/// If dependent package is not available, it calls `process()` to download it inside `.packages` directory

src/lib.rs

+7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub(crate) mod utils;
1010
mod auto_import;
1111
mod commands;
1212
mod config;
13+
mod controller;
1314
mod dependency;
1415
mod doc;
1516
mod file;
@@ -390,6 +391,9 @@ pub enum Error {
390391
#[error("HttpError: {}", _0)]
391392
HttpError(#[from] reqwest::Error),
392393

394+
#[error("APIResponseError: {}", _0)]
395+
APIResponseError(String),
396+
393397
#[error("IoError: {}", _0)]
394398
IoError(#[from] std::io::Error),
395399

@@ -416,6 +420,9 @@ pub enum Error {
416420

417421
#[error("SitemapParseError: {}", _0)]
418422
SitemapParseError(#[from] fpm::sitemap::ParseError),
423+
424+
#[error("URLParseError: {}", _0)]
425+
UrlParseError(#[from] url::ParseError),
419426
}
420427

421428
pub type Result<T> = std::result::Result<T, Error>;

src/library/http.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub async fn processor<'a>(
5353
doc.from_json(&json, section)
5454
}
5555

56-
async fn get(
56+
pub(crate) async fn get(
5757
url: url::Url,
5858
doc_id: &str,
5959
line_number: usize,
@@ -89,3 +89,23 @@ async fn _get(url: url::Url) -> reqwest::Result<String> {
8989
.build()?;
9090
c.get(url.to_string().as_str()).send()?.text()
9191
}
92+
93+
pub async fn get_with_type<T: serde::de::DeserializeOwned>(
94+
url: url::Url,
95+
headers: reqwest::header::HeaderMap,
96+
) -> fpm::Result<T> {
97+
let c = reqwest::Client::builder()
98+
.default_headers(headers)
99+
.build()?;
100+
101+
let mut resp = c.get(url.to_string().as_str()).send()?;
102+
if !resp.status().eq(&reqwest::StatusCode::OK) {
103+
return Err(fpm::Error::APIResponseError(format!(
104+
"url: {}, response_status: {}, response: {:?}",
105+
url,
106+
resp.status(),
107+
resp.text()
108+
)));
109+
}
110+
Ok(resp.json()?)
111+
}

src/library/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod fpm_dot_ftd;
22
mod get_data;
33
mod get_version_data;
4-
mod http;
4+
pub(crate) mod http;
55
mod include;
66
mod sitemap;
77
mod sqlite;

src/main.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,20 @@ async fn main() -> fpm::Result<()> {
1212
return Ok(());
1313
}
1414

15+
// Serve block moved up
16+
if let Some(mark) = matches.subcommand_matches("serve") {
17+
let port = mark
18+
.value_of("port")
19+
.unwrap_or_else(|| mark.value_of("positional_port").unwrap_or("8000"))
20+
.to_string();
21+
let bind = mark.value_of("bind").unwrap_or("127.0.0.1").to_string();
22+
tokio::task::spawn_blocking(move || {
23+
fpm::serve(bind.as_str(), port.as_str()).expect("http service error");
24+
})
25+
.await
26+
.expect("Thread spawn error");
27+
}
28+
1529
let mut config = fpm::Config::read(None).await?;
1630

1731
if matches.subcommand_matches("update").is_some() {
@@ -72,18 +86,6 @@ async fn main() -> fpm::Result<()> {
7286
let target = mark.value_of("target");
7387
fpm::stop_tracking(&config, source, target).await?;
7488
}
75-
if let Some(mark) = matches.subcommand_matches("serve") {
76-
let port = mark
77-
.value_of("port")
78-
.unwrap_or_else(|| mark.value_of("positional_port").unwrap_or("8000"))
79-
.to_string();
80-
let bind = mark.value_of("bind").unwrap_or("127.0.0.1").to_string();
81-
tokio::task::spawn_blocking(move || {
82-
fpm::serve(bind.as_str(), port.as_str()).expect("http service error");
83-
})
84-
.await
85-
.expect("Thread spawn error");
86-
}
8789
Ok(())
8890
}
8991

0 commit comments

Comments
 (0)