diff --git a/Cargo.lock b/Cargo.lock
index 33d4c1e..4bd553f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2926,6 +2926,7 @@ dependencies = [
"reqwest 0.12.19",
"secp256k1",
"serde",
+ "serde_wormhole",
"sha3",
"solana-account-decoder",
"solana-client",
diff --git a/Cargo.toml b/Cargo.toml
index e12efcf..5f824db 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ hex = { version = "0.4.3", features = ["serde"] }
reqwest = { version = "0.12.19", features = ["json"] }
secp256k1 = { version = "0.31.0", features = ["recovery"] }
serde = "1.0.219"
+serde_wormhole = "0.1.0"
sha3 = "0.10.8"
solana-account-decoder = "2.2.7"
solana-client = "2.2.7"
diff --git a/src/api_client.rs b/src/api_client.rs
index e8210cc..8e0e4ec 100644
--- a/src/api_client.rs
+++ b/src/api_client.rs
@@ -23,13 +23,23 @@ pub struct ApiClient {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Serialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
-pub struct Observation
{
+pub struct Observation {
pub version: u8,
#[serde(with = "hex::serde")]
pub signature: [u8; 65],
+ #[serde(serialize_with = "serialize_body")]
pub body: Body,
}
+fn serialize_body(body: &Body
, serializer: S) -> Result
+where
+ S: serde::Serializer,
+ P: Serialize,
+{
+ let serialized = serde_wormhole::to_vec(body).map_err(serde::ser::Error::custom)?;
+ serializer.serialize_bytes(&serialized)
+}
+
impl Observation {
pub fn try_new(body: Body
, secret_key: SecretKey) -> Result {
let digest = body.digest()?;
diff --git a/src/main.rs b/src/main.rs
index d47dbb3..c567312 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,6 +4,7 @@ use {
clap::Parser,
posted_message::PostedMessageUnreliableData,
secp256k1::SecretKey,
+ serde_wormhole::RawMessage,
solana_account_decoder::UiAccountEncoding,
solana_client::{
nonblocking::pubsub_client::PubsubClient,
@@ -93,31 +94,30 @@ async fn run_listener(input: RunListenerInput) -> Result<(), PubsubClientError>
continue;
}
- let body = Body {
- timestamp: unreliable_data.submission_time,
- nonce: unreliable_data.nonce,
- emitter_chain: unreliable_data.emitter_chain.into(),
- emitter_address: Address(unreliable_data.emitter_address),
- sequence: unreliable_data.sequence,
- consistency_level: unreliable_data.consistency_level,
- payload: unreliable_data.payload.clone(),
- };
-
- match Observation::try_new(body, input.secret_key) {
- Ok(observation) => {
- tokio::spawn({
- let api_client = input.api_client.clone();
- async move {
+ tokio::spawn({
+ let api_client = input.api_client.clone();
+ async move {
+ let body = Body {
+ timestamp: unreliable_data.submission_time,
+ nonce: unreliable_data.nonce,
+ emitter_chain: unreliable_data.emitter_chain.into(),
+ emitter_address: Address(unreliable_data.emitter_address),
+ sequence: unreliable_data.sequence,
+ consistency_level: unreliable_data.consistency_level,
+ payload: RawMessage::new(unreliable_data.payload.as_slice()),
+ };
+ match Observation::try_new(body.clone(), input.secret_key) {
+ Ok(observation) => {
if let Err(e) = api_client.post_observation(observation).await {
tracing::error!(error = ?e, "Failed to post observation");
} else {
tracing::info!("Observation posted successfully");
- }
+ };
}
- });
+ Err(e) => tracing::error!(error = ?e, "Failed to create observation"),
+ }
}
- Err(e) => tracing::error!(error = ?e, "Failed to create observation"),
- };
+ });
}
tokio::spawn(async move { unsubscribe().await });