From 226d4b2651e2dd3aac9ad3673c7d727194eaf8f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20T=C3=B3th?= Date: Wed, 27 Sep 2023 20:02:51 +0200 Subject: [PATCH] Extract modules `kompile` and `prove` (#51) * Move `_init_and_run_proof` into `run_kcfg_group` * Move `run_cfg_group` out of `foundry_prove` * Rename functions * Extract module `kontrol.prove` * Rename function * Extract module `kontrol.kompile` * Move import to top * Set Version: 0.1.9 --------- Co-authored-by: devops --- package/version | 2 +- pyproject.toml | 2 +- src/kontrol/__main__.py | 4 +- src/kontrol/foundry.py | 629 +------------------- src/kontrol/kompile.py | 188 ++++++ src/kontrol/prove.py | 539 +++++++++++++++++ src/tests/integration/test_foundry_prove.py | 12 +- src/tests/profiling/test_foundry_prove.py | 3 +- 8 files changed, 743 insertions(+), 636 deletions(-) create mode 100644 src/kontrol/kompile.py create mode 100644 src/kontrol/prove.py diff --git a/package/version b/package/version index 699c6c6d4..1a030947e 100644 --- a/package/version +++ b/package/version @@ -1 +1 @@ -0.1.8 +0.1.9 diff --git a/pyproject.toml b/pyproject.toml index cbe5773e6..57bcd50c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "kontrol" -version = "0.1.8" +version = "0.1.9" description = "Foundry integration for KEVM" authors = [ "Runtime Verification, Inc. ", diff --git a/src/kontrol/__main__.py b/src/kontrol/__main__.py index 416d422b8..f7f518a9b 100644 --- a/src/kontrol/__main__.py +++ b/src/kontrol/__main__.py @@ -16,11 +16,9 @@ from .foundry import ( Foundry, foundry_get_model, - foundry_kompile, foundry_list, foundry_merge_nodes, foundry_node_printer, - foundry_prove, foundry_remove_node, foundry_section_edge, foundry_show, @@ -28,6 +26,8 @@ foundry_step_node, foundry_to_dot, ) +from .kompile import foundry_kompile +from .prove import foundry_prove from .solc_to_k import solc_compile, solc_to_k if TYPE_CHECKING: diff --git a/src/kontrol/foundry.py b/src/kontrol/foundry.py index 7a8277edb..a1bf447bc 100644 --- a/src/kontrol/foundry.py +++ b/src/kontrol/foundry.py @@ -4,7 +4,6 @@ import logging import os import re -import shutil import sys from functools import cached_property from os import listdir @@ -13,47 +12,33 @@ from typing import TYPE_CHECKING import tomlkit -from kevm_pyk.dist import DistTarget from kevm_pyk.kevm import KEVM, KEVMNodePrinter, KEVMSemantics -from kevm_pyk.utils import ( - KDefinition__expand_macros, - abstract_cell_vars, - byte_offset_to_lines, - constraints_for, - kevm_prove, - legacy_explore, - print_failure_info, - print_model, -) -from pathos.pools import ProcessPool # type: ignore -from pyk.cterm import CTerm -from pyk.kast.inner import KApply, KSequence, KSort, KToken, KVariable, Subst -from pyk.kast.manip import flatten_label, free_vars, minimize_term, set_cell -from pyk.kast.outer import KDefinition, KFlatModule, KImport, KRequire +from kevm_pyk.utils import byte_offset_to_lines, legacy_explore, print_failure_info, print_model +from pyk.kast.inner import KApply, KSort, KToken +from pyk.kast.manip import minimize_term from pyk.kcfg import KCFG from pyk.prelude.bytes import bytesToken -from pyk.prelude.k import GENERATED_TOP_CELL -from pyk.prelude.kbool import FALSE, notBool +from pyk.prelude.kbool import notBool from pyk.prelude.kint import INT, intToken -from pyk.prelude.ml import mlEqualsTrue from pyk.proof.proof import Proof from pyk.proof.reachability import APRBMCProof, APRProof from pyk.proof.show import APRBMCProofNodePrinter, APRProofNodePrinter, APRProofShow from pyk.utils import ensure_dir_path, hash_str, run_process, single, unique -from .solc_to_k import Contract, contract_to_main_module, contract_to_verification_module +from .solc_to_k import Contract if TYPE_CHECKING: from collections.abc import Iterable from typing import Any, Final + from pyk.cterm import CTerm from pyk.kast.inner import KInner - from pyk.kcfg import KCFGExplore from pyk.kcfg.kcfg import NodeIdLike from pyk.kcfg.tui import KCFGElem from pyk.proof.show import NodePrinter from pyk.utils import BugReport + _LOGGER: Final = logging.getLogger(__name__) @@ -474,288 +459,6 @@ def free_proof_version( return latest_version + 1 if latest_version is not None else 0 -def foundry_kompile( - foundry_root: Path, - includes: Iterable[str], - regen: bool = False, - rekompile: bool = False, - requires: Iterable[str] = (), - imports: Iterable[str] = (), - ccopts: Iterable[str] = (), - llvm_kompile: bool = True, - debug: bool = False, - verbose: bool = False, -) -> None: - from kevm_pyk.kompile import KompileTarget, kevm_kompile - - syntax_module = 'FOUNDRY-CONTRACTS' - foundry = Foundry(foundry_root) - foundry_requires_dir = foundry.kompiled / 'requires' - foundry_contracts_file = foundry.kompiled / 'contracts.k' - kompiled_timestamp = foundry.kompiled / 'timestamp' - main_module = 'FOUNDRY-MAIN' - ensure_dir_path(foundry.kompiled) - ensure_dir_path(foundry_requires_dir) - - requires_paths: dict[str, str] = {} - - foundry.build() - - if not foundry.up_to_date(): - _LOGGER.info('Detected updates to contracts, regenerating K definition.') - regen = True - - for r in requires: - req = Path(r) - if not req.exists(): - raise ValueError(f'No such file: {req}') - if req.name in requires_paths.keys(): - raise ValueError( - f'Required K files have conflicting names: {r} and {requires_paths[req.name]}. Consider changing the name of one of these files.' - ) - requires_paths[req.name] = r - req_path = foundry_requires_dir / req.name - if regen or not req_path.exists(): - _LOGGER.info(f'Copying requires path: {req} -> {req_path}') - shutil.copy(req, req_path) - regen = True - - _imports: dict[str, list[str]] = {contract.name: [] for contract in foundry.contracts.values()} - for i in imports: - imp = i.split(':') - if not len(imp) == 2: - raise ValueError(f'module imports must be of the form "[ContractName]:[MODULE-NAME]". Got: {i}') - if imp[0] in _imports: - _imports[imp[0]].append(imp[1]) - else: - raise ValueError(f'Could not find contract: {imp[0]}') - - if regen or not foundry_contracts_file.exists() or not foundry.main_file.exists(): - copied_requires = [] - copied_requires += [f'requires/{name}' for name in list(requires_paths.keys())] - imports = ['FOUNDRY'] - kevm = KEVM(DistTarget.FOUNDRY.get()) - empty_config = kevm.definition.empty_config(Foundry.Sorts.FOUNDRY_CELL) - bin_runtime_definition = _foundry_to_contract_def( - empty_config=empty_config, - contracts=foundry.contracts.values(), - requires=['foundry.md'], - ) - - contract_main_definition = _foundry_to_main_def( - main_module=main_module, - empty_config=empty_config, - contracts=foundry.contracts.values(), - requires=(['contracts.k'] + copied_requires), - imports=_imports, - ) - - kevm = KEVM( - DistTarget.FOUNDRY.get(), - extra_unparsing_modules=(bin_runtime_definition.all_modules + contract_main_definition.all_modules), - ) - foundry_contracts_file.write_text(kevm.pretty_print(bin_runtime_definition, unalias=False) + '\n') - _LOGGER.info(f'Wrote file: {foundry_contracts_file}') - foundry.main_file.write_text(kevm.pretty_print(contract_main_definition) + '\n') - _LOGGER.info(f'Wrote file: {foundry.main_file}') - - def kompilation_digest() -> str: - k_files = list(requires) + [foundry_contracts_file, foundry.main_file] - return hash_str(''.join([hash_str(Path(k_file).read_text()) for k_file in k_files])) - - def kompilation_up_to_date() -> bool: - if not foundry.digest_file.exists(): - return False - digest_dict = json.loads(foundry.digest_file.read_text()) - if 'kompilation' not in digest_dict: - digest_dict['kompilation'] = '' - foundry.digest_file.write_text(json.dumps(digest_dict, indent=4)) - return digest_dict['kompilation'] == kompilation_digest() - - def update_kompilation_digest() -> None: - digest_dict = {} - if foundry.digest_file.exists(): - digest_dict = json.loads(foundry.digest_file.read_text()) - digest_dict['kompilation'] = kompilation_digest() - foundry.digest_file.write_text(json.dumps(digest_dict, indent=4)) - - _LOGGER.info('Updated Kompilation digest') - - if not kompilation_up_to_date() or rekompile or not kompiled_timestamp.exists(): - kevm_kompile( - target=KompileTarget.HASKELL_BOOSTER, - output_dir=foundry.kompiled, - main_file=foundry.main_file, - main_module=main_module, - syntax_module=syntax_module, - includes=[include for include in includes if Path(include).exists()], - emit_json=True, - ccopts=ccopts, - llvm_library=foundry.llvm_library, - debug=debug, - verbose=verbose, - ) - - update_kompilation_digest() - foundry.update_digest() - - -def foundry_prove( - foundry_root: Path, - max_depth: int = 1000, - max_iterations: int | None = None, - reinit: bool = False, - tests: Iterable[tuple[str, int | None]] = (), - exclude_tests: Iterable[str] = (), - workers: int = 1, - simplify_init: bool = True, - break_every_step: bool = False, - break_on_jumpi: bool = False, - break_on_calls: bool = True, - bmc_depth: int | None = None, - bug_report: BugReport | None = None, - kore_rpc_command: str | Iterable[str] | None = None, - use_booster: bool = False, - smt_timeout: int | None = None, - smt_retry_limit: int | None = None, - failure_info: bool = True, - counterexample_info: bool = False, - trace_rewrites: bool = False, - auto_abstract_gas: bool = False, - port: int | None = None, -) -> dict[tuple[str, int], tuple[bool, list[str] | None]]: - if workers <= 0: - raise ValueError(f'Must have at least one worker, found: --workers {workers}') - if max_iterations is not None and max_iterations < 0: - raise ValueError(f'Must have a non-negative number of iterations, found: --max-iterations {max_iterations}') - - foundry = Foundry(foundry_root, bug_report=bug_report) - - foundry.mk_proofs_dir() - - if use_booster: - try: - run_process(('which', 'kore-rpc-booster'), pipe_stderr=True).stdout.strip() - except CalledProcessError: - raise RuntimeError( - "Couldn't locate the kore-rpc-booster RPC binary. Please put 'kore-rpc-booster' on PATH manually or using kup install/kup shell." - ) from None - - if kore_rpc_command is None: - kore_rpc_command = ('kore-rpc-booster',) if use_booster else ('kore-rpc',) - - if not tests: - tests = [(test, None) for test in foundry.all_tests] - tests = list({(foundry.matching_sig(test), version) for test, version in tests}) - test_names = [test[0] for test in tests] - - _LOGGER.info(f'Running tests: {test_names}') - - contracts = set(unique({test.split('.')[0] for test in test_names})) - - setup_methods = set( - unique( - f'{contract_name}.setUp()' - for contract_name in contracts - if 'setUp' in foundry.contracts[contract_name].method_by_name - ) - ) - - def _init_and_run_proof(_init_problem: tuple[str, str, int]) -> tuple[bool, list[str] | None]: - contract_name, method_sig, version = _init_problem - contract = foundry.contracts[contract_name] - method = contract.method_by_sig[method_sig] - test_id = f'{contract_name}.{method_sig}:{version}' - llvm_definition_dir = foundry.llvm_library if use_booster else None - - start_server = port is None - - with legacy_explore( - foundry.kevm, - kcfg_semantics=KEVMSemantics(auto_abstract_gas=auto_abstract_gas), - id=test_id, - bug_report=bug_report, - kore_rpc_command=kore_rpc_command, - llvm_definition_dir=llvm_definition_dir, - smt_timeout=smt_timeout, - smt_retry_limit=smt_retry_limit, - trace_rewrites=trace_rewrites, - start_server=start_server, - port=port, - ) as kcfg_explore: - proof = _method_to_apr_proof( - foundry, - contract, - method, - foundry.proofs_dir, - kcfg_explore, - test_id, - simplify_init=simplify_init, - bmc_depth=bmc_depth, - ) - - passed = kevm_prove( - foundry.kevm, - proof, - kcfg_explore, - max_depth=max_depth, - max_iterations=max_iterations, - break_every_step=break_every_step, - break_on_jumpi=break_on_jumpi, - break_on_calls=break_on_calls, - ) - failure_log = None - if not passed: - failure_log = print_failure_info(proof, kcfg_explore, counterexample_info) - return passed, failure_log - - def run_cfg_group(tests: list[tuple[str, int]]) -> dict[tuple[str, int], tuple[bool, list[str] | None]]: - def _split_test(test: tuple[str, int]) -> tuple[str, str, int]: - test_name, version = test - contract, method = test_name.split('.') - return contract, method, version - - init_problems = [_split_test(test) for test in tests] - - _apr_proofs: list[tuple[bool, list[str] | None]] - if workers > 1: - with ProcessPool(ncpus=workers) as process_pool: - _apr_proofs = process_pool.map(_init_and_run_proof, init_problems) - else: - _apr_proofs = [] - for init_problem in init_problems: - _apr_proofs.append(_init_and_run_proof(init_problem)) - - apr_proofs = dict(zip(tests, _apr_proofs, strict=True)) - return apr_proofs - - tests_with_versions = [ - (test_name, foundry.resolve_proof_version(test_name, reinit, version)) for (test_name, version) in tests - ] - setup_methods_with_versions = [ - (setup_method_name, foundry.resolve_proof_version(setup_method_name, reinit, None)) - for setup_method_name in setup_methods - ] - - _LOGGER.info(f'Updating digests: {[test_name for test_name, _ in tests]}') - for test_name, _ in tests: - foundry.get_method(test_name).update_digest(foundry.digest_file) - _LOGGER.info(f'Updating digests: {setup_methods}') - for test_name in setup_methods: - foundry.get_method(test_name).update_digest(foundry.digest_file) - - _LOGGER.info(f'Running setup functions in parallel: {list(setup_methods)}') - results = run_cfg_group(setup_methods_with_versions) - failed = [setup_cfg for setup_cfg, passed in results.items() if not passed] - if failed: - raise ValueError(f'Running setUp method failed for {len(failed)} contracts: {failed}') - - _LOGGER.info(f'Running test functions in parallel: {test_names}') - results = run_cfg_group(tests_with_versions) - - return results - - def foundry_show( foundry_root: Path, test: str, @@ -1076,324 +779,6 @@ def _write_cfg(cfg: KCFG, path: Path) -> None: _LOGGER.info(f'Updated CFG file: {path}') -def _foundry_to_contract_def( - empty_config: KInner, - contracts: Iterable[Contract], - requires: Iterable[str], -) -> KDefinition: - modules = [contract_to_main_module(contract, empty_config, imports=['FOUNDRY']) for contract in contracts] - # First module is chosen as main module arbitrarily, since the contract definition is just a set of - # contract modules. - main_module = Contract.contract_to_module_name(list(contracts)[0].name_upper) - - return KDefinition( - main_module, - modules, - requires=(KRequire(req) for req in list(requires)), - ) - - -def _foundry_to_main_def( - main_module: str, - contracts: Iterable[Contract], - empty_config: KInner, - requires: Iterable[str], - imports: dict[str, list[str]], -) -> KDefinition: - modules = [ - contract_to_verification_module(contract, empty_config, imports=imports[contract.name]) - for contract in contracts - ] - _main_module = KFlatModule( - main_module, - imports=(KImport(mname) for mname in [_m.name for _m in modules]), - ) - - return KDefinition( - main_module, - [_main_module] + modules, - requires=(KRequire(req) for req in list(requires)), - ) - - -def _method_to_apr_proof( - foundry: Foundry, - contract: Contract, - method: Contract.Method, - save_directory: Path, - kcfg_explore: KCFGExplore, - test_id: str, - simplify_init: bool = True, - bmc_depth: int | None = None, -) -> APRProof | APRBMCProof: - contract_name = contract.name - method_sig = method.signature - if Proof.proof_data_exists(test_id, save_directory): - apr_proof = foundry.get_apr_proof(test_id) - else: - _LOGGER.info(f'Initializing KCFG for test: {test_id}') - - setup_digest = None - if method_sig != 'setUp()' and 'setUp' in contract.method_by_name: - latest_version = foundry.latest_proof_version(f'{contract.name}.setUp()') - setup_digest = f'{contract_name}.setUp():{latest_version}' - _LOGGER.info(f'Using setUp method for test: {test_id}') - - empty_config = foundry.kevm.definition.empty_config(GENERATED_TOP_CELL) - kcfg, init_node_id, target_node_id = _method_to_cfg( - empty_config, contract, method, save_directory, init_state=setup_digest - ) - - _LOGGER.info(f'Expanding macros in initial state for test: {test_id}') - init_term = kcfg.node(init_node_id).cterm.kast - init_term = KDefinition__expand_macros(foundry.kevm.definition, init_term) - init_cterm = CTerm.from_kast(init_term) - _LOGGER.info(f'Computing definedness constraint for test: {test_id}') - init_cterm = kcfg_explore.cterm_assume_defined(init_cterm) - kcfg.replace_node(init_node_id, init_cterm) - - _LOGGER.info(f'Expanding macros in target state for test: {test_id}') - target_term = kcfg.node(target_node_id).cterm.kast - target_term = KDefinition__expand_macros(foundry.kevm.definition, target_term) - target_cterm = CTerm.from_kast(target_term) - kcfg.replace_node(target_node_id, target_cterm) - - if simplify_init: - _LOGGER.info(f'Simplifying KCFG for test: {test_id}') - kcfg_explore.simplify(kcfg, {}) - if bmc_depth is not None: - apr_proof = APRBMCProof( - test_id, - kcfg, - [], - init_node_id, - target_node_id, - {}, - bmc_depth, - proof_dir=save_directory, - ) - else: - apr_proof = APRProof(test_id, kcfg, [], init_node_id, target_node_id, {}, proof_dir=save_directory) - - apr_proof.write_proof_data() - return apr_proof - - -def _method_to_cfg( - empty_config: KInner, - contract: Contract, - method: Contract.Method, - kcfgs_dir: Path, - init_state: str | None = None, -) -> tuple[KCFG, NodeIdLike, NodeIdLike]: - calldata = method.calldata_cell(contract) - callvalue = method.callvalue_cell - init_cterm = _init_cterm( - empty_config, - contract.name, - kcfgs_dir, - calldata=calldata, - callvalue=callvalue, - init_state=init_state, - ) - is_test = method.name.startswith('test') - failing = method.name.startswith('testFail') - final_cterm = _final_cterm(empty_config, contract.name, failing=failing, is_test=is_test) - - cfg = KCFG() - init_node = cfg.create_node(init_cterm) - target_node = cfg.create_node(final_cterm) - - return cfg, init_node.id, target_node.id - - -def get_final_accounts_cell( - proof_id: str, proof_dir: Path, overwrite_code_cell: KInner | None = None -) -> tuple[KInner, Iterable[KInner]]: - apr_proof = APRProof.read_proof_data(proof_dir, proof_id) - target = apr_proof.kcfg.node(apr_proof.target) - target_states = apr_proof.kcfg.covers(target_id=target.id) - if len(target_states) == 0: - raise ValueError( - f'setUp() function for {apr_proof.id} did not reach the end of execution. Maybe --max-iterations is too low?' - ) - if len(target_states) > 1: - raise ValueError(f'setUp() function for {apr_proof.id} branched and has {len(target_states)} target states.') - cterm = single(target_states).source.cterm - acct_cell = cterm.cell('ACCOUNTS_CELL') - - if overwrite_code_cell is not None: - new_accounts = [CTerm(account, []) for account in flatten_label('_AccountCellMap_', acct_cell)] - new_accounts_map = {account.cell('ACCTID_CELL'): account for account in new_accounts} - test_contract_account = new_accounts_map[Foundry.address_TEST_CONTRACT()] - - new_accounts_map[Foundry.address_TEST_CONTRACT()] = CTerm( - set_cell(test_contract_account.config, 'CODE_CELL', overwrite_code_cell), - [], - ) - - acct_cell = KEVM.accounts([account.config for account in new_accounts_map.values()]) - - fvars = free_vars(acct_cell) - acct_cons = constraints_for(fvars, cterm.constraints) - return (acct_cell, acct_cons) - - -def _init_cterm( - empty_config: KInner, - contract_name: str, - kcfgs_dir: Path, - *, - calldata: KInner | None = None, - callvalue: KInner | None = None, - init_state: str | None = None, -) -> CTerm: - program = KEVM.bin_runtime(KApply(f'contract_{contract_name}')) - account_cell = KEVM.account_cell( - Foundry.address_TEST_CONTRACT(), - intToken(0), - program, - KApply('.Map'), - KApply('.Map'), - intToken(1), - ) - init_subst = { - 'MODE_CELL': KApply('NORMAL'), - 'SCHEDULE_CELL': KApply('SHANGHAI_EVM'), - 'STATUSCODE_CELL': KVariable('STATUSCODE'), - 'CALLSTACK_CELL': KApply('.List'), - 'CALLDEPTH_CELL': intToken(0), - 'PROGRAM_CELL': program, - 'JUMPDESTS_CELL': KEVM.compute_valid_jumpdests(program), - 'ORIGIN_CELL': KVariable('ORIGIN_ID'), - 'LOG_CELL': KApply('.List'), - 'ID_CELL': Foundry.address_TEST_CONTRACT(), - 'CALLER_CELL': KVariable('CALLER_ID'), - 'ACCESSEDACCOUNTS_CELL': KApply('.Set'), - 'ACCESSEDSTORAGE_CELL': KApply('.Map'), - 'INTERIMSTATES_CELL': KApply('.List'), - 'LOCALMEM_CELL': KApply('.Bytes_BYTES-HOOKED_Bytes'), - 'PREVCALLER_CELL': KApply('.Account_EVM-TYPES_Account'), - 'PREVORIGIN_CELL': KApply('.Account_EVM-TYPES_Account'), - 'NEWCALLER_CELL': KApply('.Account_EVM-TYPES_Account'), - 'NEWORIGIN_CELL': KApply('.Account_EVM-TYPES_Account'), - 'ACTIVE_CELL': FALSE, - 'STATIC_CELL': FALSE, - 'MEMORYUSED_CELL': intToken(0), - 'WORDSTACK_CELL': KApply('.WordStack_EVM-TYPES_WordStack'), - 'PC_CELL': intToken(0), - 'GAS_CELL': intToken(9223372036854775807), - 'K_CELL': KSequence([KEVM.sharp_execute(), KVariable('CONTINUATION')]), - 'ACCOUNTS_CELL': KEVM.accounts( - [ - account_cell, # test contract address - Foundry.account_CHEATCODE_ADDRESS(KApply('.Map')), - ] - ), - 'SINGLECALL_CELL': FALSE, - 'ISREVERTEXPECTED_CELL': FALSE, - 'ISOPCODEEXPECTED_CELL': FALSE, - 'EXPECTEDADDRESS_CELL': KApply('.Account_EVM-TYPES_Account'), - 'EXPECTEDVALUE_CELL': intToken(0), - 'EXPECTEDDATA_CELL': KApply('.Bytes_BYTES-HOOKED_Bytes'), - 'OPCODETYPE_CELL': KApply('.OpcodeType_FOUNDRY-CHEAT-CODES_OpcodeType'), - 'RECORDEVENT_CELL': FALSE, - 'ISEVENTEXPECTED_CELL': FALSE, - 'ISCALLWHITELISTACTIVE_CELL': FALSE, - 'ISSTORAGEWHITELISTACTIVE_CELL': FALSE, - 'ADDRESSSET_CELL': KApply('.Set'), - 'STORAGESLOTSET_CELL': KApply('.Set'), - } - - constraints = None - if init_state: - accts, constraints = get_final_accounts_cell(init_state, kcfgs_dir, overwrite_code_cell=program) - init_subst['ACCOUNTS_CELL'] = accts - - if calldata is not None: - init_subst['CALLDATA_CELL'] = calldata - - if callvalue is not None: - init_subst['CALLVALUE_CELL'] = callvalue - - init_term = Subst(init_subst)(empty_config) - init_cterm = CTerm.from_kast(init_term) - init_cterm = KEVM.add_invariant(init_cterm) - if constraints is None: - return init_cterm - else: - for constraint in constraints: - init_cterm = init_cterm.add_constraint(constraint) - return init_cterm - - -def _final_cterm(empty_config: KInner, contract_name: str, *, failing: bool, is_test: bool = True) -> CTerm: - final_term = _final_term(empty_config, contract_name) - dst_failed_post = KEVM.lookup(KVariable('CHEATCODE_STORAGE_FINAL'), Foundry.loc_FOUNDRY_FAILED()) - foundry_success = Foundry.success( - KVariable('STATUSCODE_FINAL'), - dst_failed_post, - KVariable('ISREVERTEXPECTED_FINAL'), - KVariable('ISOPCODEEXPECTED_FINAL'), - KVariable('RECORDEVENT_FINAL'), - KVariable('ISEVENTEXPECTED_FINAL'), - ) - final_cterm = CTerm.from_kast(final_term) - if is_test: - if not failing: - return final_cterm.add_constraint(mlEqualsTrue(foundry_success)) - else: - return final_cterm.add_constraint(mlEqualsTrue(notBool(foundry_success))) - return final_cterm - - -def _final_term(empty_config: KInner, contract_name: str) -> KInner: - program = KEVM.bin_runtime(KApply(f'contract_{contract_name}')) - post_account_cell = KEVM.account_cell( - Foundry.address_TEST_CONTRACT(), - KVariable('ACCT_BALANCE_FINAL'), - program, - KVariable('ACCT_STORAGE_FINAL'), - KVariable('ACCT_ORIGSTORAGE_FINAL'), - KVariable('ACCT_NONCE_FINAL'), - ) - final_subst = { - 'K_CELL': KSequence([KEVM.halt(), KVariable('CONTINUATION')]), - 'STATUSCODE_CELL': KVariable('STATUSCODE_FINAL'), - 'ID_CELL': Foundry.address_TEST_CONTRACT(), - 'ACCOUNTS_CELL': KEVM.accounts( - [ - post_account_cell, # test contract address - Foundry.account_CHEATCODE_ADDRESS(KVariable('CHEATCODE_STORAGE_FINAL')), - KVariable('ACCOUNTS_FINAL'), - ] - ), - 'ISREVERTEXPECTED_CELL': KVariable('ISREVERTEXPECTED_FINAL'), - 'ISOPCODEEXPECTED_CELL': KVariable('ISOPCODEEXPECTED_FINAL'), - 'RECORDEVENT_CELL': KVariable('RECORDEVENT_FINAL'), - 'ISEVENTEXPECTED_CELL': KVariable('ISEVENTEXPECTED_FINAL'), - 'ISCALLWHITELISTACTIVE_CELL': KVariable('ISCALLWHITELISTACTIVE_FINAL'), - 'ISSTORAGEWHITELISTACTIVE_CELL': KVariable('ISSTORAGEWHITELISTACTIVE_FINAL'), - 'ADDRESSSET_CELL': KVariable('ADDRESSSET_FINAL'), - 'STORAGESLOTSET_CELL': KVariable('STORAGESLOTSET_FINAL'), - } - return abstract_cell_vars( - Subst(final_subst)(empty_config), - [ - KVariable('STATUSCODE_FINAL'), - KVariable('ACCOUNTS_FINAL'), - KVariable('ISREVERTEXPECTED_FINAL'), - KVariable('ISOPCODEEXPECTED_FINAL'), - KVariable('RECORDEVENT_FINAL'), - KVariable('ISEVENTEXPECTED_FINAL'), - KVariable('ISCALLWHITELISTACTIVE_FINAL'), - KVariable('ISSTORAGEWHITELISTACTIVE_FINAL'), - KVariable('ADDRESSSET_FINAL'), - KVariable('STORAGESLOTSET_FINAL'), - ], - ) - - class FoundryNodePrinter(KEVMNodePrinter): foundry: Foundry contract_name: str diff --git a/src/kontrol/kompile.py b/src/kontrol/kompile.py new file mode 100644 index 000000000..5f734847a --- /dev/null +++ b/src/kontrol/kompile.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import json +import logging +import shutil +from pathlib import Path +from typing import TYPE_CHECKING + +from kevm_pyk.dist import DistTarget +from kevm_pyk.kevm import KEVM +from kevm_pyk.kompile import KompileTarget, kevm_kompile +from pyk.kast.outer import KDefinition, KFlatModule, KImport, KRequire +from pyk.utils import ensure_dir_path, hash_str + +from .foundry import Foundry +from .solc_to_k import Contract, contract_to_main_module, contract_to_verification_module + +if TYPE_CHECKING: + from collections.abc import Iterable + from typing import Final + + from pyk.kast.inner import KInner + +_LOGGER: Final = logging.getLogger(__name__) + + +def foundry_kompile( + foundry_root: Path, + includes: Iterable[str], + regen: bool = False, + rekompile: bool = False, + requires: Iterable[str] = (), + imports: Iterable[str] = (), + ccopts: Iterable[str] = (), + llvm_kompile: bool = True, + debug: bool = False, + verbose: bool = False, +) -> None: + syntax_module = 'FOUNDRY-CONTRACTS' + foundry = Foundry(foundry_root) + foundry_requires_dir = foundry.kompiled / 'requires' + foundry_contracts_file = foundry.kompiled / 'contracts.k' + kompiled_timestamp = foundry.kompiled / 'timestamp' + main_module = 'FOUNDRY-MAIN' + ensure_dir_path(foundry.kompiled) + ensure_dir_path(foundry_requires_dir) + + requires_paths: dict[str, str] = {} + + foundry.build() + + if not foundry.up_to_date(): + _LOGGER.info('Detected updates to contracts, regenerating K definition.') + regen = True + + for r in requires: + req = Path(r) + if not req.exists(): + raise ValueError(f'No such file: {req}') + if req.name in requires_paths.keys(): + raise ValueError( + f'Required K files have conflicting names: {r} and {requires_paths[req.name]}. Consider changing the name of one of these files.' + ) + requires_paths[req.name] = r + req_path = foundry_requires_dir / req.name + if regen or not req_path.exists(): + _LOGGER.info(f'Copying requires path: {req} -> {req_path}') + shutil.copy(req, req_path) + regen = True + + _imports: dict[str, list[str]] = {contract.name: [] for contract in foundry.contracts.values()} + for i in imports: + imp = i.split(':') + if not len(imp) == 2: + raise ValueError(f'module imports must be of the form "[ContractName]:[MODULE-NAME]". Got: {i}') + if imp[0] in _imports: + _imports[imp[0]].append(imp[1]) + else: + raise ValueError(f'Could not find contract: {imp[0]}') + + if regen or not foundry_contracts_file.exists() or not foundry.main_file.exists(): + copied_requires = [] + copied_requires += [f'requires/{name}' for name in list(requires_paths.keys())] + imports = ['FOUNDRY'] + kevm = KEVM(DistTarget.FOUNDRY.get()) + empty_config = kevm.definition.empty_config(Foundry.Sorts.FOUNDRY_CELL) + bin_runtime_definition = _foundry_to_contract_def( + empty_config=empty_config, + contracts=foundry.contracts.values(), + requires=['foundry.md'], + ) + + contract_main_definition = _foundry_to_main_def( + main_module=main_module, + empty_config=empty_config, + contracts=foundry.contracts.values(), + requires=(['contracts.k'] + copied_requires), + imports=_imports, + ) + + kevm = KEVM( + DistTarget.FOUNDRY.get(), + extra_unparsing_modules=(bin_runtime_definition.all_modules + contract_main_definition.all_modules), + ) + foundry_contracts_file.write_text(kevm.pretty_print(bin_runtime_definition, unalias=False) + '\n') + _LOGGER.info(f'Wrote file: {foundry_contracts_file}') + foundry.main_file.write_text(kevm.pretty_print(contract_main_definition) + '\n') + _LOGGER.info(f'Wrote file: {foundry.main_file}') + + def kompilation_digest() -> str: + k_files = list(requires) + [foundry_contracts_file, foundry.main_file] + return hash_str(''.join([hash_str(Path(k_file).read_text()) for k_file in k_files])) + + def kompilation_up_to_date() -> bool: + if not foundry.digest_file.exists(): + return False + digest_dict = json.loads(foundry.digest_file.read_text()) + if 'kompilation' not in digest_dict: + digest_dict['kompilation'] = '' + foundry.digest_file.write_text(json.dumps(digest_dict, indent=4)) + return digest_dict['kompilation'] == kompilation_digest() + + def update_kompilation_digest() -> None: + digest_dict = {} + if foundry.digest_file.exists(): + digest_dict = json.loads(foundry.digest_file.read_text()) + digest_dict['kompilation'] = kompilation_digest() + foundry.digest_file.write_text(json.dumps(digest_dict, indent=4)) + + _LOGGER.info('Updated Kompilation digest') + + if not kompilation_up_to_date() or rekompile or not kompiled_timestamp.exists(): + kevm_kompile( + target=KompileTarget.HASKELL_BOOSTER, + output_dir=foundry.kompiled, + main_file=foundry.main_file, + main_module=main_module, + syntax_module=syntax_module, + includes=[include for include in includes if Path(include).exists()], + emit_json=True, + ccopts=ccopts, + llvm_library=foundry.llvm_library, + debug=debug, + verbose=verbose, + ) + + update_kompilation_digest() + foundry.update_digest() + + +def _foundry_to_contract_def( + empty_config: KInner, + contracts: Iterable[Contract], + requires: Iterable[str], +) -> KDefinition: + modules = [contract_to_main_module(contract, empty_config, imports=['FOUNDRY']) for contract in contracts] + # First module is chosen as main module arbitrarily, since the contract definition is just a set of + # contract modules. + main_module = Contract.contract_to_module_name(list(contracts)[0].name_upper) + + return KDefinition( + main_module, + modules, + requires=(KRequire(req) for req in list(requires)), + ) + + +def _foundry_to_main_def( + main_module: str, + contracts: Iterable[Contract], + empty_config: KInner, + requires: Iterable[str], + imports: dict[str, list[str]], +) -> KDefinition: + modules = [ + contract_to_verification_module(contract, empty_config, imports=imports[contract.name]) + for contract in contracts + ] + _main_module = KFlatModule( + main_module, + imports=(KImport(mname) for mname in [_m.name for _m in modules]), + ) + + return KDefinition( + main_module, + [_main_module] + modules, + requires=(KRequire(req) for req in list(requires)), + ) diff --git a/src/kontrol/prove.py b/src/kontrol/prove.py new file mode 100644 index 000000000..43cf88451 --- /dev/null +++ b/src/kontrol/prove.py @@ -0,0 +1,539 @@ +from __future__ import annotations + +import logging +from subprocess import CalledProcessError +from typing import TYPE_CHECKING + +from kevm_pyk.kevm import KEVM, KEVMSemantics +from kevm_pyk.utils import ( + KDefinition__expand_macros, + abstract_cell_vars, + constraints_for, + kevm_prove, + legacy_explore, + print_failure_info, +) +from pathos.pools import ProcessPool # type: ignore +from pyk.cterm import CTerm +from pyk.kast.inner import KApply, KSequence, KVariable, Subst +from pyk.kast.manip import flatten_label, free_vars, set_cell +from pyk.kcfg import KCFG +from pyk.prelude.k import GENERATED_TOP_CELL +from pyk.prelude.kbool import FALSE, notBool +from pyk.prelude.kint import intToken +from pyk.prelude.ml import mlEqualsTrue +from pyk.proof.proof import Proof +from pyk.proof.reachability import APRBMCProof, APRProof +from pyk.utils import run_process, single, unique + +from .foundry import Foundry + +if TYPE_CHECKING: + from collections.abc import Iterable + from pathlib import Path + from typing import Final + + from pyk.kast.inner import KInner + from pyk.kcfg import KCFGExplore + from pyk.kcfg.kcfg import NodeIdLike + from pyk.utils import BugReport + + from .solc_to_k import Contract + + +_LOGGER: Final = logging.getLogger(__name__) + + +def foundry_prove( + foundry_root: Path, + max_depth: int = 1000, + max_iterations: int | None = None, + reinit: bool = False, + tests: Iterable[tuple[str, int | None]] = (), + exclude_tests: Iterable[str] = (), + workers: int = 1, + simplify_init: bool = True, + break_every_step: bool = False, + break_on_jumpi: bool = False, + break_on_calls: bool = True, + bmc_depth: int | None = None, + bug_report: BugReport | None = None, + kore_rpc_command: str | Iterable[str] | None = None, + use_booster: bool = False, + smt_timeout: int | None = None, + smt_retry_limit: int | None = None, + failure_info: bool = True, + counterexample_info: bool = False, + trace_rewrites: bool = False, + auto_abstract_gas: bool = False, + port: int | None = None, +) -> dict[tuple[str, int], tuple[bool, list[str] | None]]: + if workers <= 0: + raise ValueError(f'Must have at least one worker, found: --workers {workers}') + if max_iterations is not None and max_iterations < 0: + raise ValueError(f'Must have a non-negative number of iterations, found: --max-iterations {max_iterations}') + + foundry = Foundry(foundry_root, bug_report=bug_report) + + foundry.mk_proofs_dir() + + if use_booster: + try: + run_process(('which', 'kore-rpc-booster'), pipe_stderr=True).stdout.strip() + except CalledProcessError: + raise RuntimeError( + "Couldn't locate the kore-rpc-booster RPC binary. Please put 'kore-rpc-booster' on PATH manually or using kup install/kup shell." + ) from None + + if kore_rpc_command is None: + kore_rpc_command = ('kore-rpc-booster',) if use_booster else ('kore-rpc',) + + if not tests: + tests = [(test, None) for test in foundry.all_tests] + tests = list({(foundry.matching_sig(test), version) for test, version in tests}) + test_names = [test[0] for test in tests] + + _LOGGER.info(f'Running tests: {test_names}') + + contracts = set(unique({test.split('.')[0] for test in test_names})) + + setup_methods = set( + unique( + f'{contract_name}.setUp()' + for contract_name in contracts + if 'setUp' in foundry.contracts[contract_name].method_by_name + ) + ) + + tests_with_versions = [ + (test_name, foundry.resolve_proof_version(test_name, reinit, version)) for (test_name, version) in tests + ] + setup_methods_with_versions = [ + (setup_method_name, foundry.resolve_proof_version(setup_method_name, reinit, None)) + for setup_method_name in setup_methods + ] + + _LOGGER.info(f'Updating digests: {[test_name for test_name, _ in tests]}') + for test_name, _ in tests: + foundry.get_method(test_name).update_digest(foundry.digest_file) + _LOGGER.info(f'Updating digests: {setup_methods}') + for test_name in setup_methods: + foundry.get_method(test_name).update_digest(foundry.digest_file) + + _LOGGER.info(f'Running setup functions in parallel: {list(setup_methods)}') + results = _run_cfg_group( + setup_methods_with_versions, + foundry, + max_depth=max_depth, + max_iterations=max_iterations, + workers=workers, + simplify_init=simplify_init, + break_every_step=break_every_step, + break_on_jumpi=break_on_jumpi, + break_on_calls=break_on_calls, + bmc_depth=bmc_depth, + bug_report=bug_report, + kore_rpc_command=kore_rpc_command, + use_booster=use_booster, + smt_timeout=smt_timeout, + smt_retry_limit=smt_retry_limit, + counterexample_info=counterexample_info, + trace_rewrites=trace_rewrites, + auto_abstract_gas=auto_abstract_gas, + port=port, + ) + failed = [setup_cfg for setup_cfg, passed in results.items() if not passed] + if failed: + raise ValueError(f'Running setUp method failed for {len(failed)} contracts: {failed}') + + _LOGGER.info(f'Running test functions in parallel: {test_names}') + results = _run_cfg_group( + tests_with_versions, + foundry, + max_depth=max_depth, + max_iterations=max_iterations, + workers=workers, + simplify_init=simplify_init, + break_every_step=break_every_step, + break_on_jumpi=break_on_jumpi, + break_on_calls=break_on_calls, + bmc_depth=bmc_depth, + bug_report=bug_report, + kore_rpc_command=kore_rpc_command, + use_booster=use_booster, + smt_timeout=smt_timeout, + smt_retry_limit=smt_retry_limit, + counterexample_info=counterexample_info, + trace_rewrites=trace_rewrites, + auto_abstract_gas=auto_abstract_gas, + port=port, + ) + return results + + +def _run_cfg_group( + tests: list[tuple[str, int]], + foundry: Foundry, + *, + max_depth: int, + max_iterations: int | None, + workers: int, + simplify_init: bool, + break_every_step: bool, + break_on_jumpi: bool, + break_on_calls: bool, + bmc_depth: int | None, + bug_report: BugReport | None, + kore_rpc_command: str | Iterable[str] | None, + use_booster: bool, + smt_timeout: int | None, + smt_retry_limit: int | None, + counterexample_info: bool, + trace_rewrites: bool, + auto_abstract_gas: bool, + port: int | None, +) -> dict[tuple[str, int], tuple[bool, list[str] | None]]: + def init_and_run_proof(_init_problem: tuple[str, str, int]) -> tuple[bool, list[str] | None]: + contract_name, method_sig, version = _init_problem + contract = foundry.contracts[contract_name] + method = contract.method_by_sig[method_sig] + test_id = f'{contract_name}.{method_sig}:{version}' + llvm_definition_dir = foundry.llvm_library if use_booster else None + + start_server = port is None + + with legacy_explore( + foundry.kevm, + kcfg_semantics=KEVMSemantics(auto_abstract_gas=auto_abstract_gas), + id=test_id, + bug_report=bug_report, + kore_rpc_command=kore_rpc_command, + llvm_definition_dir=llvm_definition_dir, + smt_timeout=smt_timeout, + smt_retry_limit=smt_retry_limit, + trace_rewrites=trace_rewrites, + start_server=start_server, + port=port, + ) as kcfg_explore: + proof = _method_to_apr_proof( + foundry, + contract, + method, + foundry.proofs_dir, + kcfg_explore, + test_id, + simplify_init=simplify_init, + bmc_depth=bmc_depth, + ) + + passed = kevm_prove( + foundry.kevm, + proof, + kcfg_explore, + max_depth=max_depth, + max_iterations=max_iterations, + break_every_step=break_every_step, + break_on_jumpi=break_on_jumpi, + break_on_calls=break_on_calls, + ) + failure_log = None + if not passed: + failure_log = print_failure_info(proof, kcfg_explore, counterexample_info) + return passed, failure_log + + def _split_test(test: tuple[str, int]) -> tuple[str, str, int]: + test_name, version = test + contract, method = test_name.split('.') + return contract, method, version + + init_problems = [_split_test(test) for test in tests] + + _apr_proofs: list[tuple[bool, list[str] | None]] + if workers > 1: + with ProcessPool(ncpus=workers) as process_pool: + _apr_proofs = process_pool.map(init_and_run_proof, init_problems) + else: + _apr_proofs = [] + for init_problem in init_problems: + _apr_proofs.append(init_and_run_proof(init_problem)) + + apr_proofs = dict(zip(tests, _apr_proofs, strict=True)) + return apr_proofs + + +def _method_to_apr_proof( + foundry: Foundry, + contract: Contract, + method: Contract.Method, + save_directory: Path, + kcfg_explore: KCFGExplore, + test_id: str, + simplify_init: bool = True, + bmc_depth: int | None = None, +) -> APRProof | APRBMCProof: + contract_name = contract.name + method_sig = method.signature + if Proof.proof_data_exists(test_id, save_directory): + apr_proof = foundry.get_apr_proof(test_id) + else: + _LOGGER.info(f'Initializing KCFG for test: {test_id}') + + setup_digest = None + if method_sig != 'setUp()' and 'setUp' in contract.method_by_name: + latest_version = foundry.latest_proof_version(f'{contract.name}.setUp()') + setup_digest = f'{contract_name}.setUp():{latest_version}' + _LOGGER.info(f'Using setUp method for test: {test_id}') + + empty_config = foundry.kevm.definition.empty_config(GENERATED_TOP_CELL) + kcfg, init_node_id, target_node_id = _method_to_cfg( + empty_config, contract, method, save_directory, init_state=setup_digest + ) + + _LOGGER.info(f'Expanding macros in initial state for test: {test_id}') + init_term = kcfg.node(init_node_id).cterm.kast + init_term = KDefinition__expand_macros(foundry.kevm.definition, init_term) + init_cterm = CTerm.from_kast(init_term) + _LOGGER.info(f'Computing definedness constraint for test: {test_id}') + init_cterm = kcfg_explore.cterm_assume_defined(init_cterm) + kcfg.replace_node(init_node_id, init_cterm) + + _LOGGER.info(f'Expanding macros in target state for test: {test_id}') + target_term = kcfg.node(target_node_id).cterm.kast + target_term = KDefinition__expand_macros(foundry.kevm.definition, target_term) + target_cterm = CTerm.from_kast(target_term) + kcfg.replace_node(target_node_id, target_cterm) + + if simplify_init: + _LOGGER.info(f'Simplifying KCFG for test: {test_id}') + kcfg_explore.simplify(kcfg, {}) + if bmc_depth is not None: + apr_proof = APRBMCProof( + test_id, + kcfg, + [], + init_node_id, + target_node_id, + {}, + bmc_depth, + proof_dir=save_directory, + ) + else: + apr_proof = APRProof(test_id, kcfg, [], init_node_id, target_node_id, {}, proof_dir=save_directory) + + apr_proof.write_proof_data() + return apr_proof + + +def _method_to_cfg( + empty_config: KInner, + contract: Contract, + method: Contract.Method, + kcfgs_dir: Path, + init_state: str | None = None, +) -> tuple[KCFG, NodeIdLike, NodeIdLike]: + calldata = method.calldata_cell(contract) + callvalue = method.callvalue_cell + init_cterm = _init_cterm( + empty_config, + contract.name, + kcfgs_dir, + calldata=calldata, + callvalue=callvalue, + init_state=init_state, + ) + is_test = method.name.startswith('test') + failing = method.name.startswith('testFail') + final_cterm = _final_cterm(empty_config, contract.name, failing=failing, is_test=is_test) + + cfg = KCFG() + init_node = cfg.create_node(init_cterm) + target_node = cfg.create_node(final_cterm) + + return cfg, init_node.id, target_node.id + + +def _init_cterm( + empty_config: KInner, + contract_name: str, + kcfgs_dir: Path, + *, + calldata: KInner | None = None, + callvalue: KInner | None = None, + init_state: str | None = None, +) -> CTerm: + program = KEVM.bin_runtime(KApply(f'contract_{contract_name}')) + account_cell = KEVM.account_cell( + Foundry.address_TEST_CONTRACT(), + intToken(0), + program, + KApply('.Map'), + KApply('.Map'), + intToken(1), + ) + init_subst = { + 'MODE_CELL': KApply('NORMAL'), + 'SCHEDULE_CELL': KApply('SHANGHAI_EVM'), + 'STATUSCODE_CELL': KVariable('STATUSCODE'), + 'CALLSTACK_CELL': KApply('.List'), + 'CALLDEPTH_CELL': intToken(0), + 'PROGRAM_CELL': program, + 'JUMPDESTS_CELL': KEVM.compute_valid_jumpdests(program), + 'ORIGIN_CELL': KVariable('ORIGIN_ID'), + 'LOG_CELL': KApply('.List'), + 'ID_CELL': Foundry.address_TEST_CONTRACT(), + 'CALLER_CELL': KVariable('CALLER_ID'), + 'ACCESSEDACCOUNTS_CELL': KApply('.Set'), + 'ACCESSEDSTORAGE_CELL': KApply('.Map'), + 'INTERIMSTATES_CELL': KApply('.List'), + 'LOCALMEM_CELL': KApply('.Bytes_BYTES-HOOKED_Bytes'), + 'PREVCALLER_CELL': KApply('.Account_EVM-TYPES_Account'), + 'PREVORIGIN_CELL': KApply('.Account_EVM-TYPES_Account'), + 'NEWCALLER_CELL': KApply('.Account_EVM-TYPES_Account'), + 'NEWORIGIN_CELL': KApply('.Account_EVM-TYPES_Account'), + 'ACTIVE_CELL': FALSE, + 'STATIC_CELL': FALSE, + 'MEMORYUSED_CELL': intToken(0), + 'WORDSTACK_CELL': KApply('.WordStack_EVM-TYPES_WordStack'), + 'PC_CELL': intToken(0), + 'GAS_CELL': intToken(9223372036854775807), + 'K_CELL': KSequence([KEVM.sharp_execute(), KVariable('CONTINUATION')]), + 'ACCOUNTS_CELL': KEVM.accounts( + [ + account_cell, # test contract address + Foundry.account_CHEATCODE_ADDRESS(KApply('.Map')), + ] + ), + 'SINGLECALL_CELL': FALSE, + 'ISREVERTEXPECTED_CELL': FALSE, + 'ISOPCODEEXPECTED_CELL': FALSE, + 'EXPECTEDADDRESS_CELL': KApply('.Account_EVM-TYPES_Account'), + 'EXPECTEDVALUE_CELL': intToken(0), + 'EXPECTEDDATA_CELL': KApply('.Bytes_BYTES-HOOKED_Bytes'), + 'OPCODETYPE_CELL': KApply('.OpcodeType_FOUNDRY-CHEAT-CODES_OpcodeType'), + 'RECORDEVENT_CELL': FALSE, + 'ISEVENTEXPECTED_CELL': FALSE, + 'ISCALLWHITELISTACTIVE_CELL': FALSE, + 'ISSTORAGEWHITELISTACTIVE_CELL': FALSE, + 'ADDRESSSET_CELL': KApply('.Set'), + 'STORAGESLOTSET_CELL': KApply('.Set'), + } + + constraints = None + if init_state: + accts, constraints = _get_final_accounts_cell(init_state, kcfgs_dir, overwrite_code_cell=program) + init_subst['ACCOUNTS_CELL'] = accts + + if calldata is not None: + init_subst['CALLDATA_CELL'] = calldata + + if callvalue is not None: + init_subst['CALLVALUE_CELL'] = callvalue + + init_term = Subst(init_subst)(empty_config) + init_cterm = CTerm.from_kast(init_term) + init_cterm = KEVM.add_invariant(init_cterm) + if constraints is None: + return init_cterm + else: + for constraint in constraints: + init_cterm = init_cterm.add_constraint(constraint) + return init_cterm + + +def _final_cterm(empty_config: KInner, contract_name: str, *, failing: bool, is_test: bool = True) -> CTerm: + final_term = _final_term(empty_config, contract_name) + dst_failed_post = KEVM.lookup(KVariable('CHEATCODE_STORAGE_FINAL'), Foundry.loc_FOUNDRY_FAILED()) + foundry_success = Foundry.success( + KVariable('STATUSCODE_FINAL'), + dst_failed_post, + KVariable('ISREVERTEXPECTED_FINAL'), + KVariable('ISOPCODEEXPECTED_FINAL'), + KVariable('RECORDEVENT_FINAL'), + KVariable('ISEVENTEXPECTED_FINAL'), + ) + final_cterm = CTerm.from_kast(final_term) + if is_test: + if not failing: + return final_cterm.add_constraint(mlEqualsTrue(foundry_success)) + else: + return final_cterm.add_constraint(mlEqualsTrue(notBool(foundry_success))) + return final_cterm + + +def _final_term(empty_config: KInner, contract_name: str) -> KInner: + program = KEVM.bin_runtime(KApply(f'contract_{contract_name}')) + post_account_cell = KEVM.account_cell( + Foundry.address_TEST_CONTRACT(), + KVariable('ACCT_BALANCE_FINAL'), + program, + KVariable('ACCT_STORAGE_FINAL'), + KVariable('ACCT_ORIGSTORAGE_FINAL'), + KVariable('ACCT_NONCE_FINAL'), + ) + final_subst = { + 'K_CELL': KSequence([KEVM.halt(), KVariable('CONTINUATION')]), + 'STATUSCODE_CELL': KVariable('STATUSCODE_FINAL'), + 'ID_CELL': Foundry.address_TEST_CONTRACT(), + 'ACCOUNTS_CELL': KEVM.accounts( + [ + post_account_cell, # test contract address + Foundry.account_CHEATCODE_ADDRESS(KVariable('CHEATCODE_STORAGE_FINAL')), + KVariable('ACCOUNTS_FINAL'), + ] + ), + 'ISREVERTEXPECTED_CELL': KVariable('ISREVERTEXPECTED_FINAL'), + 'ISOPCODEEXPECTED_CELL': KVariable('ISOPCODEEXPECTED_FINAL'), + 'RECORDEVENT_CELL': KVariable('RECORDEVENT_FINAL'), + 'ISEVENTEXPECTED_CELL': KVariable('ISEVENTEXPECTED_FINAL'), + 'ISCALLWHITELISTACTIVE_CELL': KVariable('ISCALLWHITELISTACTIVE_FINAL'), + 'ISSTORAGEWHITELISTACTIVE_CELL': KVariable('ISSTORAGEWHITELISTACTIVE_FINAL'), + 'ADDRESSSET_CELL': KVariable('ADDRESSSET_FINAL'), + 'STORAGESLOTSET_CELL': KVariable('STORAGESLOTSET_FINAL'), + } + return abstract_cell_vars( + Subst(final_subst)(empty_config), + [ + KVariable('STATUSCODE_FINAL'), + KVariable('ACCOUNTS_FINAL'), + KVariable('ISREVERTEXPECTED_FINAL'), + KVariable('ISOPCODEEXPECTED_FINAL'), + KVariable('RECORDEVENT_FINAL'), + KVariable('ISEVENTEXPECTED_FINAL'), + KVariable('ISCALLWHITELISTACTIVE_FINAL'), + KVariable('ISSTORAGEWHITELISTACTIVE_FINAL'), + KVariable('ADDRESSSET_FINAL'), + KVariable('STORAGESLOTSET_FINAL'), + ], + ) + + +def _get_final_accounts_cell( + proof_id: str, proof_dir: Path, overwrite_code_cell: KInner | None = None +) -> tuple[KInner, Iterable[KInner]]: + apr_proof = APRProof.read_proof_data(proof_dir, proof_id) + target = apr_proof.kcfg.node(apr_proof.target) + target_states = apr_proof.kcfg.covers(target_id=target.id) + if len(target_states) == 0: + raise ValueError( + f'setUp() function for {apr_proof.id} did not reach the end of execution. Maybe --max-iterations is too low?' + ) + if len(target_states) > 1: + raise ValueError(f'setUp() function for {apr_proof.id} branched and has {len(target_states)} target states.') + cterm = single(target_states).source.cterm + acct_cell = cterm.cell('ACCOUNTS_CELL') + + if overwrite_code_cell is not None: + new_accounts = [CTerm(account, []) for account in flatten_label('_AccountCellMap_', acct_cell)] + new_accounts_map = {account.cell('ACCTID_CELL'): account for account in new_accounts} + test_contract_account = new_accounts_map[Foundry.address_TEST_CONTRACT()] + + new_accounts_map[Foundry.address_TEST_CONTRACT()] = CTerm( + set_cell(test_contract_account.config, 'CODE_CELL', overwrite_code_cell), + [], + ) + + acct_cell = KEVM.accounts([account.config for account in new_accounts_map.values()]) + + fvars = free_vars(acct_cell) + acct_cons = constraints_for(fvars, cterm.constraints) + return (acct_cell, acct_cons) diff --git a/src/tests/integration/test_foundry_prove.py b/src/tests/integration/test_foundry_prove.py index e7d2f1995..6f3c1850c 100644 --- a/src/tests/integration/test_foundry_prove.py +++ b/src/tests/integration/test_foundry_prove.py @@ -10,15 +10,9 @@ from pyk.proof import APRProof from pyk.utils import run_process, single -from kontrol.foundry import ( - Foundry, - foundry_kompile, - foundry_merge_nodes, - foundry_prove, - foundry_remove_node, - foundry_show, - foundry_step_node, -) +from kontrol.foundry import Foundry, foundry_merge_nodes, foundry_remove_node, foundry_show, foundry_step_node +from kontrol.kompile import foundry_kompile +from kontrol.prove import foundry_prove from .utils import TEST_DATA_DIR diff --git a/src/tests/profiling/test_foundry_prove.py b/src/tests/profiling/test_foundry_prove.py index f7bc2e1a8..529d23f4d 100644 --- a/src/tests/profiling/test_foundry_prove.py +++ b/src/tests/profiling/test_foundry_prove.py @@ -6,7 +6,8 @@ from pyk.utils import run_process -from kontrol.foundry import foundry_kompile, foundry_prove +from kontrol.kompile import foundry_kompile +from kontrol.prove import foundry_prove from .utils import TEST_DATA_DIR