Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3500,7 +3500,7 @@ pub trait MmCoin: SwapOps + WatcherOps + MarketCoinOps + Send + Sync + 'static {
// BCH cash address format has colon after prefix, e.g. bitcoincash:
// Colon can't be used in file names on Windows so it should be escaped
let my_address = my_address.replace(':', "_");
ctx.dbdir()
ctx.address_dir(&my_address)
.join("TRANSACTIONS")
.join(format!("{}_{}.json", self.ticker(), my_address))
}
Expand All @@ -3512,7 +3512,7 @@ pub trait MmCoin: SwapOps + WatcherOps + MarketCoinOps + Send + Sync + 'static {
// BCH cash address format has colon after prefix, e.g. bitcoincash:
// Colon can't be used in file names on Windows so it should be escaped
let my_address = my_address.replace(':', "_");
ctx.dbdir()
ctx.address_dir(&my_address)
.join("TRANSACTIONS")
.join(format!("{}_{}_migration", self.ticker(), my_address))
}
Expand Down Expand Up @@ -5781,6 +5781,9 @@ where

let fut = async move {
let fs_fut = async {
mm2_io::fs::create_parents_async(&migration_path)
.await
.map_err(|e| e.into_inner())?;
let mut file = fs::File::create(&tmp_file).await?;
file.write_all(&migration_number.to_le_bytes()).await?;
file.flush().await?;
Expand All @@ -5805,28 +5808,15 @@ where
T: MmCoin + MarketCoinOps + ?Sized,
{
let history_path = coin.tx_history_path(ctx);
let tmp_file = format!("{}.tmp", history_path.display());

history.sort_unstable_by(compare_transaction_details);

let fut = async move {
let content = json::to_vec(&history).map_to_mm(|e| TxHistoryError::ErrorSerializing(e.to_string()))?;

let fs_fut = async {
let mut file = fs::File::create(&tmp_file).await?;
file.write_all(&content).await?;
file.flush().await?;
fs::rename(&tmp_file, &history_path).await?;
Ok(())
};

let res: io::Result<_> = fs_fut.await;
if let Err(e) = res {
let error = format!("Error '{}' creating/writing/renaming the tmp file {}", e, tmp_file);
return MmError::err(TxHistoryError::ErrorSaving(error));
}
mm2_io::fs::write_json(&history, &history_path, true)
.await
.mm_err(|e| TxHistoryError::ErrorSaving(e.to_string()))?;
Ok(())
};

Box::new(fut.boxed().compat())
}

Expand Down
39 changes: 34 additions & 5 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ pub struct MmCtx {
/// The DB connection to the global DB hosting common data (e.g. stats) and other data needed for correctly bootstrapping on restarts.
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
pub global_db_conn: OnceLock<Arc<Mutex<Connection>>>,
/// The DB connection to the global DB hosting common data (e.g. stats) and other data needed for correctly bootstrapping on restarts.
///
/// This is the same DB as `self.global_db_conn` but made available via an asynchronous interface.
/// Use this if favor of `self.global_db_conn` for new implementations.
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
pub async_global_db_conn: OnceLock<Arc<AsyncMutex<AsyncConnection>>>,
/// The DB connection to the wallet DB the KDF instance will use for current execution.
///
/// The wallet DB path is based on the seed that KDF is initialized with. An initialization with different seed will use a different wallet DB.
Expand Down Expand Up @@ -205,6 +211,8 @@ impl MmCtx {
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
global_db_conn: OnceLock::default(),
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
async_global_db_conn: OnceLock::default(),
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
wallet_db_conn: OnceLock::default(),
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
async_wallet_db_conn: OnceLock::default(),
Expand Down Expand Up @@ -386,13 +394,23 @@ impl MmCtx {
if cfg!(not(feature = "new-db-arch")) {
return self.dbdir();
}
// Colon can't be used in file names on Windows so it should be escaped.
let address = address.replace(':', "_");
self.db_root().join("addresses").join(address)
}

/// Returns a SQL connection to the global database.
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
pub fn global_db(&self) -> MutexGuard<Connection> { self.global_db_conn.get().unwrap().lock().unwrap() }

/// Returns an AsyncSQL connection to the global database.
///
/// This replaces `self.global_db()` and should be used for new implementations.
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
pub async fn async_global_db(&self) -> Arc<AsyncMutex<AsyncConnection>> {
self.async_global_db_conn.get().unwrap().clone()
}

/// Returns a SQL connection to the shared wallet database.
///
/// For new implementations, use `self.async_wallet_db()` instead.
Expand Down Expand Up @@ -478,13 +496,19 @@ impl MmCtx {
#[cfg(all(feature = "new-db-arch", not(target_arch = "wasm32")))]
pub async fn init_global_and_wallet_db(&self) -> Result<(), String> {
let global_db = Connection::open(self.global_dir().join("global.db")).map_err(|e| e.to_string())?;
let async_global_db = AsyncConnection::open(self.global_dir().join("global.db"))
.await
.map_err(|e| e.to_string())?;
let wallet_db = Connection::open(self.wallet_dir().join("wallet.db")).map_err(|e| e.to_string())?;
let async_wallet_db = AsyncConnection::open(self.wallet_dir().join("wallet.db"))
.await
.map_err(|e| e.to_string())?;
self.global_db_conn
.set(Arc::new(Mutex::new(global_db)))
.map_err(|_| "Global DB already set".to_string())?;
self.async_global_db_conn
.set(Arc::new(AsyncMutex::new(async_global_db)))
.map_err(|_| "Async Global DB already set".to_string())?;
self.wallet_db_conn
.set(Arc::new(Mutex::new(wallet_db)))
.map_err(|_| "Wallet DB already set".to_string())?;
Expand Down Expand Up @@ -530,11 +554,6 @@ impl MmCtx {
Ok(())
}

#[cfg(not(target_arch = "wasm32"))]
pub fn sqlite_conn_opt(&self) -> Option<MutexGuard<Connection>> {
self.sqlite_connection.get().map(|conn| conn.lock().unwrap())
}

#[cfg(not(target_arch = "wasm32"))]
pub fn sqlite_connection(&self) -> MutexGuard<Connection> {
self.sqlite_connection
Expand Down Expand Up @@ -576,6 +595,16 @@ pub enum AddressDataError {
SqliteConnectionFailure(db_common::sqlite::rusqlite::Error),
}

#[cfg(not(target_arch = "wasm32"))]
impl From<AddressDataError> for String {
fn from(err: AddressDataError) -> Self {
match err {
AddressDataError::CreateAddressDirFailure(e) => format!("Failed to create address directory: {}", e),
AddressDataError::SqliteConnectionFailure(e) => format!("Failed to open SQLite connection: {}", e),
}
}
}

/// Returns the path to the MM database root.
///
/// Path priority:
Expand Down
75 changes: 75 additions & 0 deletions mm2src/mm2_main/src/database/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use db_common::{async_sql_conn::AsyncConnError,
sqlite::{query_single_row, rusqlite::params}};
use derive_more::Display;
use mm2_core::mm_ctx::MmArc;
use uuid::Uuid;

// TODO: Let's let this table be called `ongoing_swaps` and remove the `is_finished` column.
// And we might also add a new table `completed_swaps` that hold a copy of all the completed swaps for all the coins.
const INIT_GLOBAL_DB_TABLES: &str = "
CREATE TABLE IF NOT EXISTS swaps (
uuid VARCHAR(255) PRIMARY KEY,
maker_address VARCHAR(255) NOT NULL
);
";
const SELECT_ADDRESS_FOR_SWAP_UUID: &str = "SELECT maker_address FROM swaps WHERE uuid = ?1";
const INSERT_SWAP: &str = "INSERT INTO swaps (uuid, maker_address) VALUES (?1, ?2)";

/// Errors that can occur when interacting with the global database.
#[derive(Debug, Display)]
pub enum GlobalDBError {
SqlError(AsyncConnError),
}

impl From<GlobalDBError> for String {
fn from(err: GlobalDBError) -> Self {
match err {
GlobalDBError::SqlError(e) => format!("SQL error: {}", e),
}
}
}

impl From<AsyncConnError> for GlobalDBError {
fn from(err: AsyncConnError) -> Self { GlobalDBError::SqlError(err) }
}

/// Initializes the global database with the necessary tables.
pub async fn init_global_db(ctx: &MmArc) -> Result<(), GlobalDBError> {
let conn = ctx.async_global_db().await;
conn.lock()
.await
.call(|conn| conn.execute_batch(INIT_GLOBAL_DB_TABLES).map_err(|e| e.into()))
.await?;
Ok(())
}

/// Gets the maker address for a given swap UUID from the global database.
///
/// Returns `Ok(Some(addr))` if the UUID is found, `Ok(None)` if the UUID is not found, and `Err(e)` if there was an error.
pub async fn get_maker_address_for_swap_uuid(ctx: &MmArc, uuid: &Uuid) -> Result<Option<String>, GlobalDBError> {
let conn = ctx.async_global_db().await;
let uuid = uuid.to_string();
let address: Option<String> = conn
.lock()
.await
.call(move |conn| {
query_single_row(conn, SELECT_ADDRESS_FOR_SWAP_UUID, params![uuid], |row| row.get(0)).map_err(|e| e.into())
})
.await?;
Ok(address)
}

/// Inserts a new swap handle (uuid and maker address pair) into the global database.
pub async fn insert_swap_in_global_db(ctx: &MmArc, uuid: &Uuid, maker_address: &str) -> Result<(), GlobalDBError> {
let conn = ctx.async_global_db().await;
let uuid = uuid.to_string();
let maker_address = maker_address.to_string();
conn.lock()
.await
.call(move |conn| {
conn.execute(INSERT_SWAP, params![uuid, maker_address])
.map_err(|e| e.into())
})
.await?;
Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// The module responsible to work with SQLite database
///
#[cfg(feature = "new-db-arch")]
pub mod global;
pub mod my_orders;
pub mod my_swaps;
pub mod stats_nodes;
Expand Down
3 changes: 1 addition & 2 deletions mm2src/mm2_main/src/database/my_swaps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ const INSERT_MY_SWAP: &str =
"INSERT INTO my_swaps (my_coin, other_coin, uuid, started_at, swap_type) VALUES (?1, ?2, ?3, ?4, ?5)";

pub fn insert_new_swap(
ctx: &MmArc,
conn: &Connection,
my_coin: &str,
other_coin: &str,
uuid: &str,
started_at: &str,
swap_type: u8,
) -> SqlResult<()> {
debug!("Inserting new swap {} to the SQLite database", uuid);
let conn = ctx.sqlite_connection();
let params = [my_coin, other_coin, uuid, started_at, &swap_type.to_string()];
conn.execute(INSERT_MY_SWAP, params).map(|_| ())
}
Expand Down
5 changes: 5 additions & 0 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// marketmaker
//

#[cfg(all(not(target_arch = "wasm32"), feature = "new-db-arch"))]
use crate::database::global::init_global_db;
#[cfg(not(target_arch = "wasm32"))]
use crate::database::init_and_migrate_sql_db;
use crate::lp_healthcheck::peer_healthcheck_topic;
Expand Down Expand Up @@ -394,6 +396,9 @@ pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> {
ctx.init_global_and_wallet_db()
.await
.map_to_mm(MmInitError::ErrorSqliteInitializing)?;
init_global_db(&ctx)
.await
.map_to_mm(|e| MmInitError::ErrorSqliteInitializing(e.to_string()))?;
}
}

Expand Down
8 changes: 8 additions & 0 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3244,6 +3244,10 @@ async fn start_maker_legacy_swap(
) {
if let Err(e) = insert_new_swap_to_db(
ctx.clone(),
&params
.maker_coin
.address_from_pubkey(&(*params.my_persistent_pub).into())
.unwrap(),
params.maker_coin.ticker(),
params.taker_coin.ticker(),
*params.uuid,
Expand Down Expand Up @@ -3496,6 +3500,10 @@ async fn start_taker_legacy_swap(

if let Err(e) = insert_new_swap_to_db(
ctx.clone(),
&params
.maker_coin
.address_from_pubkey(&(*params.my_persistent_pub).into())
.unwrap(),
params.taker_coin.ticker(),
params.maker_coin.ticker(),
*params.uuid,
Expand Down
16 changes: 12 additions & 4 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,23 +970,26 @@ pub fn my_swap_file_path(ctx: &MmArc, address: &str, uuid: &Uuid) -> PathBuf {

pub async fn insert_new_swap_to_db(
ctx: MmArc,
maker_address: &str,
my_coin: &str,
other_coin: &str,
uuid: Uuid,
started_at: u64,
swap_type: u8,
) -> Result<(), String> {
MySwapsStorage::new(ctx)
.save_new_swap(my_coin, other_coin, uuid, started_at, swap_type)
.save_new_swap(my_coin, other_coin, maker_address, uuid, started_at, swap_type)
.await
.map_err(|e| ERRL!("{}", e))
}

#[cfg(not(target_arch = "wasm32"))]
fn add_swap_to_db_index(ctx: &MmArc, swap: &SavedSwap) {
if let Some(conn) = ctx.sqlite_conn_opt() {
crate::database::stats_swaps::add_swap_to_index(&conn, swap)
}
#[cfg(not(feature = "new-db-arch"))]
let conn = ctx.sqlite_connection();
#[cfg(feature = "new-db-arch")]
let conn = ctx.global_db();
crate::database::stats_swaps::add_swap_to_index(&conn, swap)
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -1504,8 +1507,13 @@ pub async fn import_swaps(ctx: MmArc, req: Json) -> Result<Response<Vec<u8>>, St
match swap.save_to_db(&ctx).await {
Ok(_) => {
if let Some(info) = swap.get_my_info() {
#[cfg(all(not(target_arch = "wasm32"), feature = "new-db-arch"))]
let maker_address = swap.maker_address();
#[cfg(not(feature = "new-db-arch"))]
let maker_address = "no maker-address/address-dir in old DB arch";
if let Err(e) = insert_new_swap_to_db(
ctx.clone(),
maker_address,
&info.my_coin,
&info.other_coin,
*swap.uuid(),
Expand Down
18 changes: 17 additions & 1 deletion mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use uuid::Uuid;

cfg_native!(
use crate::database::my_swaps::{insert_new_swap_v2, SELECT_MY_SWAP_V2_BY_UUID};
#[cfg(feature = "new-db-arch")] use crate::database::global;
use common::async_blocking;
use db_common::sqlite::rusqlite::{named_params, Error as SqlError, Result as SqlResult, Row};
use db_common::sqlite::rusqlite::types::Type as SqlType;
Expand Down Expand Up @@ -225,8 +226,23 @@ impl StateMachineStorage for MakerSwapStorage {
let ctx = self.ctx.clone();
let id_str = id.to_string();

#[cfg(feature = "new-db-arch")]
let address_dir = global::get_maker_address_for_swap_uuid(&ctx, &id)
.await
.map_err(|e| SwapStateMachineError::StorageError(e.to_string()))?
.ok_or_else(|| SwapStateMachineError::StorageError(format!("No swap with UUID={} found.", id)))?;
#[cfg(not(feature = "new-db-arch"))]
let address_dir = String::from("address_dir doesn't have any effect in feature != new-db-arch");

let conn = ctx.address_db(&address_dir).map_err(|e| {
SwapStateMachineError::StorageError(format!(
"Failed to get address db for swap UUID={} and address_dir={}: {}",
id, address_dir, e
))
})?;

async_blocking(move || {
Ok(ctx.sqlite_connection().query_row(
Ok(conn.query_row(
SELECT_MY_SWAP_V2_BY_UUID,
&[(":uuid", &id_str)],
MakerSwapDbRepr::from_sql_row,
Expand Down
Loading
Loading