Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove memory storage and indexes #1219

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions docs/event-queue-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@ To enable the Event Queue feature, you must add this CLI option when running the
For example:

```bash
poetry run hathor-cli run_node --memory-storage --status 8080 --testnet --enable-event-queue
poetry run hathor-cli run_node --temp-data --status 8080 --testnet --enable-event-queue
```

### First run

If this is the first time your full node is running with the event queue enabled, there are 3 possibilities:
If this is the first time your full node is running with the event queue enabled, there are 2 possibilities:

1. You're running the full node using memory storage, like in the example above;
2. You're running the full node using RocksDB storage (the default option), and
1. You're performing a sync from scratch, that is, you don't have an existing database, or
2. You're running from an existing database.
1. You're performing a sync from scratch or you're using a temporary database (like in the example above), that is, you don't have an existing database, or
2. You're running from an existing database.

For cases 1 and 2.1, the full node will start normally, events will be generated in real time while vertices are synced, and they'll be sent to the WebSocket connection accordingly, as explained below.
For case 1, the full node will start normally, events will be generated in real time while vertices are synced and they'll be sent to the WebSocket connection accordingly, as explained below.

For case 2.2, an extra loading step will be performed during full node initialization, generating events for all existing vertices in your database. This step is slower than normal full node initialization and can take several minutes. Note that this will only be necessary once — after initialization, the events generated for your database are persisted and will be used in subsequent runs.
For case 2, an extra loading step will be performed during full node initialization, generating events for all existing vertices in your database. This step is slower than normal full node initialization and can take several minutes. Note that this will only be necessary once — after initialization, the events generated for your database are persisted and will be used in subsequent runs.

### Subsequent runs when using RocksDB

Expand Down
4 changes: 2 additions & 2 deletions extras/custom_tests/side_dag/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
python -m hathor run_node_with_side_dag
--disable-logs
--testnet
--memory-storage
--temp-data
--x-localhost-only
--procname-prefix {HATHOR_PROCESS_PREFIX}
--side-dag-testnet
--side-dag-memory-storage
--side-dag-temp-data
--side-dag-x-localhost-only
--side-dag-procname-prefix {SIDE_DAG_PROCESS_PREFIX}
"""
Expand Down
167 changes: 54 additions & 113 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum, IntEnum
import tempfile
from enum import IntEnum
from typing import Any, Callable, NamedTuple, Optional, TypeAlias

from structlog import get_logger
from typing_extensions import assert_never

from hathor.checkpoint import Checkpoint
from hathor.conf.settings import HathorSettings as HathorSettingsType
from hathor.consensus import ConsensusAlgorithm
from hathor.consensus.poa import PoaBlockProducer, PoaSigner
from hathor.daa import DifficultyAdjustmentAlgorithm
from hathor.event import EventManager
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.storage import EventRocksDBStorage, EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.bit_signaling_service import BitSignalingService
from hathor.feature_activation.feature import Feature
from hathor.feature_activation.feature_service import FeatureService
from hathor.feature_activation.storage.feature_activation_storage import FeatureActivationStorage
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.indexes import IndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p.manager import ConnectionsManager
Expand All @@ -40,12 +40,7 @@
from hathor.reactor import ReactorProtocol as Reactor
from hathor.storage import RocksDBStorage
from hathor.stratum import StratumFactory
from hathor.transaction.storage import (
TransactionCacheStorage,
TransactionMemoryStorage,
TransactionRocksDBStorage,
TransactionStorage,
)
from hathor.transaction.storage import TransactionCacheStorage, TransactionRocksDBStorage, TransactionStorage
from hathor.transaction.vertex_parser import VertexParser
from hathor.util import Random, get_environment_info
from hathor.verification.verification_service import VerificationService
Expand Down Expand Up @@ -87,11 +82,6 @@ def add_factories(
p2p_manager.enable_sync_version(SyncVersion.V2)


class StorageType(Enum):
MEMORY = 'memory'
ROCKSDB = 'rocksdb'


class BuildArtifacts(NamedTuple):
"""Artifacts created by a builder."""
peer: PrivatePeer
Expand All @@ -107,7 +97,7 @@ class BuildArtifacts(NamedTuple):
bit_signaling_service: BitSignalingService
indexes: Optional[IndexesManager]
wallet: Optional[BaseWallet]
rocksdb_storage: Optional[RocksDBStorage]
rocksdb_storage: RocksDBStorage
stratum_factory: Optional[StratumFactory]


Expand All @@ -123,7 +113,7 @@ class Builder:
Example:

builder = Builder()
builder.use_memory()
builder.enable_event_queue()
artifacts = builder.build()
"""
def __init__(self) -> None:
Expand All @@ -138,9 +128,6 @@ def __init__(self) -> None:
self._peer: Optional[PrivatePeer] = None
self._cmdline: str = ''

self._storage_type: StorageType = StorageType.MEMORY
self._force_memory_index: bool = False

self._event_manager: Optional[EventManager] = None
self._enable_event_queue: Optional[bool] = None

Expand All @@ -156,7 +143,7 @@ def __init__(self) -> None:
self._vertex_verifiers_builder: _VertexVerifiersBuilder | None = None
self._verification_service: Optional[VerificationService] = None

self._rocksdb_path: Optional[str] = None
self._rocksdb_path: str | tempfile.TemporaryDirectory | None = None
self._rocksdb_storage: Optional[RocksDBStorage] = None
self._rocksdb_cache_capacity: Optional[int] = None

Expand All @@ -182,8 +169,6 @@ def __init__(self) -> None:

self._enable_stratum_server: Optional[bool] = None

self._full_verification: Optional[bool] = None

self._soft_voided_tx_ids: Optional[set[bytes]] = None

self._execution_manager: ExecutionManager | None = None
Expand Down Expand Up @@ -219,6 +204,7 @@ def build(self) -> BuildArtifacts:
event_manager = self._get_or_create_event_manager()
indexes = self._get_or_create_indexes_manager()
tx_storage = self._get_or_create_tx_storage()
rocksdb_storage = self._get_or_create_rocksdb_storage()
feature_service = self._get_or_create_feature_service()
bit_signaling_service = self._get_or_create_bit_signaling_service()
verification_service = self._get_or_create_verification_service()
Expand All @@ -239,9 +225,6 @@ def build(self) -> BuildArtifacts:

kwargs: dict[str, Any] = {}

if self._full_verification is not None:
kwargs['full_verification'] = self._full_verification

if self._enable_event_queue is not None:
kwargs['enable_event_queue'] = self._enable_event_queue

Expand Down Expand Up @@ -290,7 +273,7 @@ def build(self) -> BuildArtifacts:
tx_storage=tx_storage,
indexes=indexes,
wallet=wallet,
rocksdb_storage=self._rocksdb_storage,
rocksdb_storage=rocksdb_storage,
stratum_factory=stratum_factory,
feature_service=feature_service,
bit_signaling_service=bit_signaling_service
Expand Down Expand Up @@ -388,20 +371,11 @@ def _create_stratum_server(self, manager: HathorManager) -> StratumFactory:
return stratum_factory

def _get_or_create_rocksdb_storage(self) -> RocksDBStorage:
assert self._rocksdb_path is not None

if self._rocksdb_storage is not None:
return self._rocksdb_storage

kwargs = {}
if self._rocksdb_cache_capacity is not None:
kwargs = dict(cache_capacity=self._rocksdb_cache_capacity)

self._rocksdb_storage = RocksDBStorage(
path=self._rocksdb_path,
**kwargs
)

if self._rocksdb_storage is None:
self._rocksdb_storage = RocksDBStorage(
path=self._rocksdb_path,
cache_capacity=self._rocksdb_cache_capacity,
)
return self._rocksdb_storage

def _get_or_create_p2p_manager(self) -> ConnectionsManager:
Expand Down Expand Up @@ -433,19 +407,12 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager:
return self._p2p_manager

def _get_or_create_indexes_manager(self) -> IndexesManager:
if self._indexes_manager is not None:
return self._indexes_manager

if self._force_memory_index or self._storage_type == StorageType.MEMORY:
self._indexes_manager = MemoryIndexesManager(settings=self._get_or_create_settings())

elif self._storage_type == StorageType.ROCKSDB:
if self._indexes_manager is None:
rocksdb_storage = self._get_or_create_rocksdb_storage()
self._indexes_manager = RocksDBIndexesManager(rocksdb_storage)

else:
raise NotImplementedError

self._indexes_manager = RocksDBIndexesManager(
rocksdb_storage,
settings=self._get_or_create_settings(),
)
return self._indexes_manager

def _get_or_create_tx_storage(self) -> TransactionStorage:
Expand All @@ -461,21 +428,14 @@ def _get_or_create_tx_storage(self) -> TransactionStorage:
if self._tx_storage_cache:
store_indexes = None

if self._storage_type == StorageType.MEMORY:
self._tx_storage = TransactionMemoryStorage(indexes=store_indexes, settings=settings)

elif self._storage_type == StorageType.ROCKSDB:
rocksdb_storage = self._get_or_create_rocksdb_storage()
vertex_parser = self._get_or_create_vertex_parser()
self._tx_storage = TransactionRocksDBStorage(
rocksdb_storage,
indexes=store_indexes,
settings=settings,
vertex_parser=vertex_parser,
)

else:
raise NotImplementedError
rocksdb_storage = self._get_or_create_rocksdb_storage()
vertex_parser = self._get_or_create_vertex_parser()
self._tx_storage = TransactionRocksDBStorage(
rocksdb_storage,
indexes=store_indexes,
settings=settings,
vertex_parser=vertex_parser,
)

if self._tx_storage_cache:
reactor = self._get_reactor()
Expand All @@ -489,16 +449,9 @@ def _get_or_create_tx_storage(self) -> TransactionStorage:
return self._tx_storage

def _get_or_create_event_storage(self) -> EventStorage:
if self._event_storage is not None:
pass
elif self._storage_type == StorageType.MEMORY:
self._event_storage = EventMemoryStorage()
elif self._storage_type == StorageType.ROCKSDB:
if self._event_storage is None:
rocksdb_storage = self._get_or_create_rocksdb_storage()
self._event_storage = EventRocksDBStorage(rocksdb_storage)
else:
raise NotImplementedError

return self._event_storage

def _get_or_create_event_manager(self) -> EventManager:
Expand Down Expand Up @@ -562,14 +515,11 @@ def _get_or_create_verification_service(self) -> VerificationService:

return self._verification_service

def _get_or_create_feature_storage(self) -> FeatureActivationStorage | None:
match self._storage_type:
case StorageType.MEMORY: return None
case StorageType.ROCKSDB: return FeatureActivationStorage(
settings=self._get_or_create_settings(),
rocksdb_storage=self._get_or_create_rocksdb_storage()
)
case _: assert_never(self._storage_type)
def _get_or_create_feature_storage(self) -> FeatureActivationStorage:
return FeatureActivationStorage(
settings=self._get_or_create_settings(),
rocksdb_storage=self._get_or_create_rocksdb_storage()
)

def _get_or_create_vertex_verifiers(self) -> VertexVerifiers:
if self._vertex_verifiers is None:
Expand Down Expand Up @@ -638,33 +588,28 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:

return self._poa_block_producer

def use_memory(self) -> 'Builder':
def set_rocksdb_path(self, path: str | tempfile.TemporaryDirectory) -> 'Builder':
if self._tx_storage:
raise ValueError('cannot set rocksdb path after tx storage is set')
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
self._rocksdb_path = path
return self

def use_rocksdb(
self,
path: str,
cache_capacity: Optional[int] = None
) -> 'Builder':
def set_rocksdb_cache_capacity(self, cache_capacity: int) -> 'Builder':
if self._tx_storage:
raise ValueError('cannot set rocksdb cache capacity after tx storage is set')
self.check_if_can_modify()
self._storage_type = StorageType.ROCKSDB
self._rocksdb_path = path
self._rocksdb_cache_capacity = cache_capacity
return self

def use_tx_storage_cache(self, capacity: Optional[int] = None) -> 'Builder':
if self._tx_storage:
raise ValueError('cannot set tx storage cache capacity after tx storage is set')
self.check_if_can_modify()
self._tx_storage_cache = True
self._tx_storage_cache_capacity = capacity
return self

def force_memory_index(self) -> 'Builder':
self.check_if_can_modify()
self._force_memory_index = True
return self

def _get_or_create_wallet(self) -> Optional[BaseWallet]:
if self._wallet is not None:
return self._wallet
Expand Down Expand Up @@ -693,21 +638,29 @@ def enable_stratum_server(self) -> 'Builder':
return self

def enable_address_index(self) -> 'Builder':
if self._tx_storage or self._indexes_manager:
raise ValueError('cannot enable index after tx storage or indexes manager is set')
self.check_if_can_modify()
self._enable_address_index = True
return self

def enable_tokens_index(self) -> 'Builder':
if self._tx_storage or self._indexes_manager:
raise ValueError('cannot enable index after tx storage or indexes manager is set')
self.check_if_can_modify()
self._enable_tokens_index = True
return self

def enable_utxo_index(self) -> 'Builder':
if self._tx_storage or self._indexes_manager:
raise ValueError('cannot enable index after tx storage or indexes manager is set')
self.check_if_can_modify()
self._enable_utxo_index = True
return self

def enable_wallet_index(self) -> 'Builder':
if self._tx_storage or self._indexes_manager:
raise ValueError('cannot enable index after tx storage or indexes manager is set')
self.check_if_can_modify()
self.enable_address_index()
self.enable_tokens_index()
Expand All @@ -721,6 +674,9 @@ def enable_event_queue(self) -> 'Builder':
def set_tx_storage(self, tx_storage: TransactionStorage) -> 'Builder':
self.check_if_can_modify()
self._tx_storage = tx_storage
internal = tx_storage.store if isinstance(tx_storage, TransactionCacheStorage) else tx_storage
assert isinstance(internal, TransactionRocksDBStorage)
self._rocksdb_storage = internal._rocksdb_storage
return self

def set_event_storage(self, event_storage: EventStorage) -> 'Builder':
Expand Down Expand Up @@ -778,21 +734,6 @@ def disable_sync_v2(self) -> 'Builder':
self._sync_v2_support = SyncSupportLevel.DISABLED
return self

def set_full_verification(self, full_verification: bool) -> 'Builder':
self.check_if_can_modify()
self._full_verification = full_verification
return self

def enable_full_verification(self) -> 'Builder':
self.check_if_can_modify()
self._full_verification = True
return self

def disable_full_verification(self) -> 'Builder':
self.check_if_can_modify()
self._full_verification = False
return self

def enable_ipv6(self) -> 'Builder':
self.check_if_can_modify()
self._enable_ipv6 = True
Expand Down
Loading