From df4b5c0742533261e861f05072a460993b1819ae Mon Sep 17 00:00:00 2001 From: Stevengre Date: Thu, 23 Jan 2025 13:49:44 +0000 Subject: [PATCH] simlify the implementation --- kevm-pyk/src/kevm_pyk/__main__.py | 30 +- kevm-pyk/src/kevm_pyk/summarizer.py | 368 +++++++----------- .../src/tests/integration/test_summarize.py | 52 ++- 3 files changed, 199 insertions(+), 251 deletions(-) diff --git a/kevm-pyk/src/kevm_pyk/__main__.py b/kevm-pyk/src/kevm_pyk/__main__.py index 3824cf0a40..0acbf88a0d 100644 --- a/kevm-pyk/src/kevm_pyk/__main__.py +++ b/kevm-pyk/src/kevm_pyk/__main__.py @@ -32,7 +32,7 @@ from pyk.proof.tui import APRProofViewer from pyk.utils import FrozenDict, hash_str, single -from kevm_pyk.summarizer import KEVMSummarizer, get_todo_list, summarize +from kevm_pyk.summarizer import summarize from . import VERSION, config from .cli import _create_argument_parser, generate_options, get_argument_type_setter, get_option_string_destination @@ -636,18 +636,28 @@ def exec_kast(options: KastOptions) -> None: def exec_summarize(options: ProveOptions) -> None: - proof_dir = Path(__file__).parent / 'proofs' - save_directory = Path(__file__).parent / 'summaries' - p = APRProof.read_proof_data(Path(__file__).parent / 'proofs', 'STOP-SUMMARY-16-TO-10') - # print('proof initial state -------------------------------') - # print(p.kcfg.node(p.init).cterm) - summarizer = KEVMSummarizer(proof_dir, save_directory) - summarizer.explore(p) - + # Path(__file__).parent / 'proofs' + # Path(__file__).parent / 'summaries' + # kevm = KEVM(kdist.get('evm-semantics.haskell')) + # p = APRProof.read_proof_data(Path(__file__).parent / 'proofs', 'BALANCE_1_SPEC') + # node_printer = kevm_node_printer(kevm, p) + # proof_show = APRProofShow(kevm, node_printer=node_printer) + + # res_lines = proof_show.show( + # p, + # nodes=[node.id for node in p.kcfg.nodes], + # ) + # for res_line in res_lines: + # print(res_line) + # # print('proof initial state -------------------------------') + # # print(p.kcfg.node(p.init).cterm) + # summarizer = KEVMSummarizer(proof_dir, save_directory) + # summarizer.explore(p) + # batch_summarize() # next_opcode = get_todo_list() # _LOGGER.info(f'summarizing {next_opcode}') - # summarize(next_opcode[0]) + summarize('BALANCE') # analyze_proof('CREATE_3', 3) diff --git a/kevm-pyk/src/kevm_pyk/summarizer.py b/kevm-pyk/src/kevm_pyk/summarizer.py index 668cc2873c..dcdab203a1 100644 --- a/kevm-pyk/src/kevm_pyk/summarizer.py +++ b/kevm-pyk/src/kevm_pyk/summarizer.py @@ -9,7 +9,7 @@ from pyk.cterm import CSubst, CTerm, CTermSymbolic, cterm_build_claim from pyk.kast.inner import KApply, KInner, KSequence, KToken, KVariable, Subst -from pyk.kast.outer import KClaim, KSort +from pyk.kast.outer import KSort from pyk.kcfg import KCFG, KCFGExplore from pyk.kdist import kdist from pyk.kore.rpc import KoreClient @@ -205,6 +205,39 @@ } +NOT_USEGAS_OPCODES = [ + 'BALANCE', + 'EXTCODESIZE', + 'EXTCODECOPY', + 'CALLER', + 'RETURNDATASIZE', + 'EXTCODEHASH', + 'COINBASE', + 'SELFBALANCE', + 'POP', + 'MLOAD', + 'MSTORE8', + 'SLOAD', + 'SSTORE', + 'JUMP', + 'JUMPI', + 'MSIZE', + 'TLOAD', + 'TSTORE', + 'PUSH', + 'CREATE', + 'CALL', +] + +ACCOUNT_QUERIES_OPCODES = [ + 'BALANCE', + 'EXTCODESIZE', + 'EXTCODEHASH' 'EXTCODECOPY', +] + +ACCOUNT_STORAGE_OPCODES = ['SLOAD', 'SSTORE', 'TLOAD', 'TSTORE'] + + def get_passed_opcodes() -> list[str]: passed_opcodes = [] for opcode in OPCODES_SUMMARY_STATUS: @@ -269,13 +302,48 @@ def stack_needed(opcode_id: str) -> list[int]: return [0] +def accounts_cell(acct_id: str) -> tuple[KInner, KInner]: + """Return the accounts cell with constraints on the accounts.""" + acct_id_cell = KApply('', KVariable(acct_id, KSort('Int'))) + balance_cell = KApply('', KVariable('BALANCE_CELL', KSort('Int'))) + code_cell = KApply('', KVariable('CODE_CELL', KSort('AccountCode'))) + storage_cell = KApply('', KVariable('STORAGE_CELL', KSort('Map'))) + orig_storage_cell = KApply('', KVariable('ORIG_STORAGE_CELL', KSort('Map'))) + transient_storage_cell = KApply('', KVariable('TRANSIENT_STORAGE_CELL', KSort('Map'))) + nonce_cell = KApply('', KVariable('NONCE_CELL', KSort('Int'))) + account_cell = KApply( + '', + [ + acct_id_cell, + balance_cell, + code_cell, + storage_cell, + orig_storage_cell, + transient_storage_cell, + nonce_cell, + ], + ) + dot_account_var = KVariable('DotAccountVar', KSort('AccountCellMap')) + constraint = mlEqualsFalse( + KApply( + 'AccountCellMap:in_keys', + [ + KApply('', KVariable(acct_id, KSort('Int'))), + KVariable('DotAccountVar', KSort('AccountCellMap')), + ], + ) + ) + + return KApply('_AccountCellMap_', [account_cell, dot_account_var]), constraint + + class KEVMSummarizer: """ A class for summarizing the instructions of the KEVM. - 1. `build_spec` Build the proof to symbolically execute one abitrary opcode. - 2. `explore` Run the proof to get the KCFG. - 3. `summarize` Minimize the KCFG to get the summarized rules for opcodes. + 1. `build_spec` builds the proof to symbolically execute one abitrary opcode. + 2. `explore` runs the proof to get the KCFG. + 3. `summarize` minimizes the KCFG to get the summarized rules for opcodes. """ _cterm_symbolic: CTermSymbolic @@ -288,188 +356,87 @@ def __init__(self, proof_dir: Path, save_directory: Path) -> None: self.proof_dir = proof_dir self.save_directory = save_directory + def build_stack_underflow_spec(self) -> APRProof: + """Build the specification to symbolically execute abitrary instruction with stack underflow.""" + # TODO: + ... + def build_spec(self, opcode: KApply, stack_needed: int, id_str: str = '') -> APRProof: - """ - Build the specification to symbolically execute one abitrary instruction. - """ + """Build the specification to symbolically execute one abitrary instruction.""" cterm = CTerm(self.kevm.definition.empty_config(KSort('GeneratedTopCell'))) + opcode_symbol = opcode.label.name.split('_')[0] # construct the initial substitution + _init_subst: dict[str, KInner] = {} + _init_constraints: list[KInner] = [] next_opcode = KApply('#next[_]_EVM_InternalOp_MaybeOpCode', opcode) - _init_subst: dict[str, KInner] = {'K_CELL': KSequence([next_opcode, KVariable('K_CELL')])} - _init_subst['WORDSTACK_CELL'] = word_stack(stack_needed) + _init_subst['K_CELL'] = KSequence([next_opcode, KVariable('K_CELL')]) # #next [ OPCODE ] ~> K_CELL + _init_subst['WORDSTACK_CELL'] = word_stack(stack_needed) # W0 : W1 : ... : Wn for not underflow + _init_subst['ID_CELL'] = KVariable('ID_CELL', KSort('Int')) # ID_CELL should be Int for ADDRESS, LOG. + # This is because #push doesn't handle `.Account`. And it's okay to be Int for other opcodes. + _init_subst['CALLER_CELL'] = KVariable('CALLER_CELL', KSort('Int')) # CALLER_CELL should be Int for CALLER. + _init_subst['ORIGIN_CELL'] = KVariable('ORIGIN_CELL', KSort('Int')) # ORIGIN_CELL should be Int for ORIGIN. + _init_subst['GAS_CELL'] = KEVM.inf_gas( + KVariable('GAS_CELL', KSort('Gas')) + ) # inf_gas to reduce the computation cost. + if opcode_symbol in NOT_USEGAS_OPCODES: + _init_subst['USEGAS_CELL'] = KToken('false', KSort('Bool')) # TODO: cannot usegas for these opcodes. + if opcode_symbol in ACCOUNT_QUERIES_OPCODES: + cell, constraint = accounts_cell('W0') + _init_subst['ACCOUNTS_CELL'] = cell + _init_constraints.append(constraint) + if opcode_symbol in ACCOUNT_STORAGE_OPCODES: + cell, constraint = accounts_cell('ID_CELL') + _init_subst['ACCOUNTS_CELL'] = cell + _init_constraints.append(constraint) init_subst = CSubst(Subst(_init_subst), ()) - # TODO use inf_gas for gas cell. # construct the final substitution _final_subst: dict[str, KInner] = {vname: KVariable('FINAL_' + vname) for vname in cterm.free_vars} _final_subst['K_CELL'] = KVariable('K_CELL') + if opcode_symbol == 'JUMP': + _final_subst['K_CELL'] = KSequence([KApply('#endBasicBlock_EVM_InternalOp'), KVariable('K_CELL')]) final_subst = CSubst(Subst(_final_subst), ()) - opcode_symbol = opcode.label.name.split('_')[0] + init_cterm = init_subst(cterm) + init_cterm = CTerm(init_cterm.config, _init_constraints + list(init_cterm.constraints)) + kclaim = cterm_build_claim(f'{opcode_symbol}{id_str}_{stack_needed}_SPEC', init_cterm, final_subst(cterm)) + proof = APRProof.from_claim(self.kevm.definition, kclaim[0], {}, self.proof_dir) + return proof - kclaim, subst = cterm_build_claim( - f'{opcode_symbol}{id_str}_{stack_needed}_SPEC', init_subst(cterm), final_subst(cterm) - ) - # >> TODO: The cterm_build_claim will remove all the type I've set. - kclaim = KClaim(subst(kclaim.body), subst(kclaim.requires), subst(kclaim.ensures), kclaim.att) - proof = APRProof.from_claim(self.kevm.definition, kclaim, {}, self.proof_dir) - # SummarizeConfig class; - if opcode_symbol in ['ADDRESS', 'LOG']: - # >> CHECK THIS: Because #push doesn't handle `.Account`, we need to set the type of `ID_CELL` to `Int` - _LOGGER.info(f'Setting the type of `ID_CELL` to `Int` for {opcode_symbol}') - _type_subst: dict[str, KInner] = {'ID_CELL': KVariable('ID_CELL', KSort('Int'))} # for every opcode. - type_subst = CSubst(Subst(_type_subst), ()) - node = proof.kcfg.get_node(1) - proof.kcfg.let_node(1, cterm=type_subst(node.cterm), attrs=node.attrs) - if opcode_symbol == 'CALLER': - _LOGGER.info(f'Setting the type of `CALLER_CELL` to `Int` for {opcode_symbol}') - _type_subst: dict[str, KInner] = {'CALLER_CELL': KVariable('CALLER_CELL', KSort('Int'))} - type_subst = CSubst(Subst(_type_subst), ()) - node = proof.kcfg.get_node(1) - proof.kcfg.let_node(1, cterm=type_subst(node.cterm), attrs=node.attrs) - if opcode_symbol == 'ORIGIN': - _LOGGER.info(f'Setting the type of `ORIGIN_CELL` to `Int` for {opcode_symbol}') - _type_subst: dict[str, KInner] = {'ORIGIN_CELL': KVariable('ORIGIN_CELL', KSort('Int'))} - type_subst = CSubst(Subst(_type_subst), ()) - node = proof.kcfg.get_node(1) - proof.kcfg.let_node(1, cterm=type_subst(node.cterm), attrs=node.attrs) - if opcode_symbol in [ - 'BALANCE', - 'EXTCODESIZE', - 'EXTCODECOPY', - 'CALLER', - 'RETURNDATASIZE', - 'EXTCODEHASH', - 'COINBASE', - 'SELFBALANCE', - 'POP', - 'MLOAD', - 'MSTORE8', - 'SLOAD', - 'SSTORE', - 'JUMP', - 'JUMPI', - 'MSIZE', - 'TLOAD', - 'TSTORE', - 'PUSH', - 'CREATE', - 'CALL', - ]: - # >> CHECK THIS: don't calculate Gas - _LOGGER.info(f'Setting the type of `USEGAS_CELL` to `false` for {opcode_symbol}') - _gas_subst: dict[str, KInner] = {'USEGAS_CELL': KToken('false', KSort('Bool'))} - gas_subst = CSubst(Subst(_gas_subst), ()) - node = proof.kcfg.get_node(1) - proof.kcfg.let_node(1, cterm=gas_subst(node.cterm), attrs=node.attrs) - if opcode_symbol in ['SELFBALANCE', 'SLOAD', 'SSTORE', 'TLOAD', 'TSTORE', 'CREATE']: - _LOGGER.info(f'Setting the type of `ID_CELL` to `Int` for {opcode_symbol}') - _subst: dict[str, KInner] = {'ID_CELL': KVariable('ID_CELL', KSort('Int'))} - - _LOGGER.info(f'Setting more concrete `ACCOUNTS_CELL` for {opcode_symbol}') - acct_id_cell = KApply('', KVariable('WO', KSort('Int'))) # --> ID_CELL for every opcode. - balance_cell = KApply('', KVariable('BALANCE_CELL', KSort('Int'))) - code_cell = KApply('', KVariable('CODE_CELL', KSort('AccountCode'))) - storage_cell = KApply('', KVariable('STORAGE_CELL', KSort('Map'))) - orig_storage_cell = KApply('', KVariable('ORIG_STORAGE_CELL', KSort('Map'))) - transient_storage_cell = KApply('', KVariable('TRANSIENT_STORAGE_CELL', KSort('Map'))) - nonce_cell = KApply('', KVariable('NONCE_CELL', KSort('Int'))) - account_cell = KApply( - '', - [ - acct_id_cell, - balance_cell, - code_cell, - storage_cell, - orig_storage_cell, - transient_storage_cell, - nonce_cell, - ], - ) - dot_account_var = KVariable('DotAccountVar', KSort('AccountCellMap')) - _subst['ACCOUNTS_CELL'] = KApply('_AccountCellMap_', [account_cell, dot_account_var]) - - _LOGGER.info(f'Setting constraints on `ACCOUNTS_CELL` for {opcode_symbol}') - constraint = mlEqualsFalse( - KApply( - 'AccountCellMap:in_keys', - [ - KApply('', KVariable('W0', KSort('Int'))), - KVariable('DotAccountVar', KSort('AccountCellMap')), - ], + def build_specs(self, opcode_symbol: str) -> list[APRProof]: + needs = stack_needed(opcode_symbol) + opcode = OPCODES[opcode_symbol] + proofs = [] + for need in needs: + if len(needs) > 1: + opcode = KApply(opcode.label.name, KToken(str(need), KSort('Int'))) + + if opcode_symbol == 'JUMPI': + proof = self.build_spec(opcode, need, id_str='_FALSE') + constraint = mlEquals(KVariable('W1', KSort('Int')), KToken('0', KSort('Int')), 'Int') + proof.kcfg.let_node( + 1, cterm=proof.kcfg.get_node(1).cterm.add_constraint(constraint), attrs=proof.kcfg.get_node(1).attrs ) - ) - - subst = CSubst(Subst(_subst), ()) - node = proof.kcfg.get_node(1) - new_cterm = subst(node.cterm) - new_cterm = new_cterm.add_constraint(constraint) - proof.kcfg.let_node(1, cterm=new_cterm, attrs=node.attrs) - - if opcode_symbol in ['CALL']: - _LOGGER.info(f'Setting the type of `ID_CELL` to `Int` for {opcode_symbol}') - _subst: dict[str, KInner] = {'ID_CELL': KVariable('ID_CELL', KSort('Int'))} - - _LOGGER.info(f'Setting more concrete `ACCOUNTS_CELL` for {opcode_symbol}') - acct_id_cell = KApply('', KVariable('ID_CELL', KSort('Int'))) - balance_cell = KApply('', KVariable('BALANCE_CELL', KSort('Int'))) - code_cell = KApply('', KVariable('CODE_CELL', KSort('AccountCode'))) - storage_cell = KApply('', KVariable('STORAGE_CELL', KSort('Map'))) - orig_storage_cell = KApply('', KVariable('ORIG_STORAGE_CELL', KSort('Map'))) - transient_storage_cell = KApply('', KVariable('TRANSIENT_STORAGE_CELL', KSort('Map'))) - nonce_cell = KApply('', KVariable('NONCE_CELL', KSort('Int'))) - account_cell = KApply( - '', - [ - acct_id_cell, - balance_cell, - code_cell, - storage_cell, - orig_storage_cell, - transient_storage_cell, - nonce_cell, - ], - ) - dot_account_var = KVariable('DotAccountVar', KSort('AccountCellMap')) - _subst['ACCOUNTS_CELL'] = KApply('_AccountCellMap_', [account_cell, dot_account_var]) - - _LOGGER.info(f'Setting constraints on `ACCOUNTS_CELL` for {opcode_symbol}') - constraint = mlEqualsFalse( - KApply( - 'AccountCellMap:in_keys', - [ - KApply('', KVariable('ID_CELL', KSort('Int'))), - KVariable('DotAccountVar', KSort('AccountCellMap')), - ], + proofs.append(proof) + + proof = self.build_spec(opcode, need, id_str='_TRUE') + _subst = {'K_CELL': KSequence([KApply('#endBasicBlock_EVM_InternalOp'), KVariable('K_CELL')])} + subst = CSubst(Subst(_subst), ()) + constraint = mlNot(mlEquals(KVariable('W1', KSort('Int')), KToken('0', KSort('Int')), 'Int')) + proof.kcfg.let_node( + 1, cterm=proof.kcfg.get_node(1).cterm.add_constraint(constraint), attrs=proof.kcfg.get_node(1).attrs ) - ) - - subst = CSubst(Subst(_subst), ()) - node = proof.kcfg.get_node(1) - new_cterm = subst(node.cterm) - new_cterm = new_cterm.add_constraint(constraint) - proof.kcfg.let_node(1, cterm=new_cterm, attrs=node.attrs) - - if opcode_symbol in ['JUMP']: - _LOGGER.info(f'Setting the final state of `K_CELL` to `#endBasicBlock ~> K_CELL` for {opcode_symbol}') - _subst = {'K_CELL': KSequence([KApply('#endBasicBlock_EVM_InternalOp'), KVariable('K_CELL')])} - subst = CSubst(Subst(_subst), ()) - node = proof.kcfg.get_node(2) - proof.kcfg.let_node(2, cterm=subst(node.cterm), attrs=node.attrs) - - # if opcode_symbol in ['JUMPI']: - # _LOGGER.info(f'Setting the final state of `K_CELL` to `#endBasicBlock ~> K_CELL` for {opcode_symbol}') - # _subst = {'K_CELL': KVariable('FINAL_K_CELL')} - # subst = CSubst(Subst(_subst), ()) - # node = proof.kcfg.get_node(2) - # constraint0 = mlEquals(KVariable('FINAL_K_CELL'), KVariable('K_CELL')) - # constraint1 = mlEquals(KVariable('FINAL_K_CELL'), KSequence([KApply('#endBasicBlock_EVM_InternalOp'), KVariable('K_CELL')])) - # constraint = mlOr([constraint0, constraint1]) - # proof.kcfg.let_node(2, cterm=subst(node.cterm).add_constraint(constraint), attrs=node.attrs) - - _LOGGER.debug(proof.kcfg.nodes[0].cterm.to_dict()) - return proof + proof.kcfg.let_node(2, cterm=subst(proof.kcfg.get_node(2).cterm), attrs=proof.kcfg.get_node(2).attrs) + proofs.append(proof) + elif opcode_symbol == 'LOG': + need += 2 + proof = self.build_spec(opcode, need) + proofs.append(proof) + else: + proof = self.build_spec(opcode, need) + proofs.append(proof) + return proofs def explore(self, proof: APRProof) -> bool: """ @@ -571,7 +538,7 @@ def create_kcfg_explore() -> KCFGExplore: return passed def summarize(self, proof: APRProof, merge: bool = False) -> None: - # TODO: need customized minimization rules, maybe + # TODO: may need customized way to generate summary rules, e.g., replacing infinite gas with finite gas. proof.minimize_kcfg(KEVMSemantics(allow_symbolic_program=True), merge) node_printer = kevm_node_printer(self.kevm, proof) proof_show = APRProofShow(self.kevm, node_printer=node_printer) @@ -611,7 +578,6 @@ def batch_summarize(num_processes: int = 4) -> None: opcodes_to_process = [op for op in OPCODES.keys()] passed_opcodes = get_passed_opcodes() unpassed_opcodes = [opcode for opcode in opcodes_to_process if opcode not in passed_opcodes] - # 分成两组,一组是有CALL的,一组是没有的 has_call_opcodes = [opcode for opcode in unpassed_opcodes if 'Call' in OPCODES[opcode].label.name] no_call_opcodes = [opcode for opcode in unpassed_opcodes if 'Call' not in OPCODES[opcode].label.name] @@ -630,51 +596,12 @@ def summarize(opcode_symbol: str) -> tuple[KEVMSummarizer, list[APRProof]]: proof_dir = Path(__file__).parent / 'proofs' save_directory = Path(__file__).parent / 'summaries' summarizer = KEVMSummarizer(proof_dir, save_directory) - needs = stack_needed(opcode_symbol) - opcode = OPCODES[opcode_symbol] - proofs = [] - for need in needs: - if len(needs) > 1: - opcode = KApply(opcode.label.name, KToken(str(need), KSort('Int'))) - if opcode_symbol == 'JUMPI': - proof = summarizer.build_spec(opcode, need, id_str='_FALSE') - constraint = mlEquals(KVariable('W1', KSort('Int')), KToken('0', KSort('Int')), 'Int') - proof.kcfg.let_node( - 1, cterm=proof.kcfg.get_node(1).cterm.add_constraint(constraint), attrs=proof.kcfg.get_node(1).attrs - ) - summarizer.explore(proof) - summarizer.summarize(proof) - proof.write_proof_data() - proofs.append(proof) - - proof = summarizer.build_spec(opcode, need, id_str='_TRUE') - _subst = {'K_CELL': KSequence([KApply('#endBasicBlock_EVM_InternalOp'), KVariable('K_CELL')])} - subst = CSubst(Subst(_subst), ()) - constraint = mlNot(mlEquals(KVariable('W1', KSort('Int')), KToken('0', KSort('Int')), 'Int')) - proof.kcfg.let_node( - 1, cterm=proof.kcfg.get_node(1).cterm.add_constraint(constraint), attrs=proof.kcfg.get_node(1).attrs - ) - proof.kcfg.let_node(2, cterm=subst(proof.kcfg.get_node(2).cterm), attrs=proof.kcfg.get_node(2).attrs) - summarizer.explore(proof) - summarizer.summarize(proof) - proof.write_proof_data() - proofs.append(proof) - elif opcode_symbol == 'LOG': - need += 2 - proof = summarizer.build_spec(opcode, need) - summarizer.explore(proof) - summarizer.summarize(proof) - proof.write_proof_data() - proofs.append(proof) - else: - proof = summarizer.build_spec(opcode, need) - summarizer.explore(proof) - summarizer.summarize(proof) - proof.write_proof_data() - proofs.append(proof) + proofs = summarizer.build_specs(opcode_symbol) + for proof in proofs: + summarizer.explore(proof) + summarizer.summarize(proof) + proof.write_proof_data() return summarizer, proofs - # summarizer.analyze_proof(proof_dir / 'STOP_SPEC') - # validation: generate them as claims and call kevm prove. def analyze_proof(opcode: str, node_id: int) -> None: @@ -682,10 +609,3 @@ def analyze_proof(opcode: str, node_id: int) -> None: save_directory = Path(__file__).parent / 'summaries' summarizer = KEVMSummarizer(proof_dir, save_directory) summarizer.analyze_proof(proof_dir / f'{opcode}_SPEC', node_id) - -def debug_proof(proof_id: str) -> None: - proof_dir = Path(__file__).parent / 'proofs' - save_directory = Path(__file__).parent / 'summaries' - summarizer = KEVMSummarizer(proof_dir, save_directory) - proof = APRProof.read_proof_data(proof_dir, proof_id) - \ No newline at end of file diff --git a/kevm-pyk/src/tests/integration/test_summarize.py b/kevm-pyk/src/tests/integration/test_summarize.py index d92353c8ab..3c1cdab0c9 100644 --- a/kevm-pyk/src/tests/integration/test_summarize.py +++ b/kevm-pyk/src/tests/integration/test_summarize.py @@ -1,53 +1,71 @@ -from ast import Import -from kevm_pyk.summarizer import OPCODES, get_summary_status, OPCODES_SUMMARY_STATUS, summarize import pytest -from pyk.kcfg import KCFG -from pyk.kast.inner import KVariable, Subst -from pyk.kast.outer import KFlatModule, KRequire, KImport from pyk.kast.att import Atts -from pyk.cterm import CSubst +from pyk.kast.outer import KClaim +from pyk.kcfg import KCFG from pyk.proof.reachability import APRProof +from kevm_pyk.summarizer import OPCODES, OPCODES_SUMMARY_STATUS, get_summary_status, summarize + + @pytest.mark.parametrize('opcode', OPCODES.keys()) def test_summarize(opcode: str) -> None: if get_summary_status(opcode) != 'PASSED': pytest.skip(f'{opcode} status: {OPCODES_SUMMARY_STATUS[opcode]}') - if opcode == 'STOP': + if opcode == 'ADD': + print(f'[{opcode}] selected') summarizer, proofs = summarize(opcode) + print(f'[{opcode}] summarize finished') + print(f'[{opcode}] {len(proofs)} proofs generated') + + # + # proof_dir = Path(__file__).parent / 'proofs' + # save_directory = Path(__file__).parent / 'summaries' + # proofs = [APRProof.read_proof_data(Path(__file__).parent.parent.parent / 'kevm_pyk' / 'proofs', 'ADD_2_SPEC')] + # print(f'{len(proofs)} proofs') + for proof in proofs: - claims = [] - + claims: list[KClaim] = [] + # no pending, failing, bounded nodes in the proof for leaf in proof.kcfg.leaves: assert not proof.is_pending(leaf.id) assert not proof.kcfg.is_stuck(leaf.id) assert not proof.kcfg.is_vacuous(leaf.id) assert not proof.is_bounded(leaf.id) - + print(f'[{opcode}] Node {leaf.id} is not pending, stuck, vacuous, or bounded') + # only one successor from the initial node in the proof successors = proof.kcfg.successors(proof.init) assert len(successors) == 1 successor = successors[0] - + print(f'[{opcode}] Node {proof.init} has only one successor') + # only one edge to the terminal node if isinstance(successor, KCFG.Split): targets = successor.targets assert len(proof.kcfg.edges()) == len(targets) + print(f'[{opcode}] Split has {len(targets)} targets') for target in targets: successors = proof.kcfg.successors(target.id) assert len(successors) == 1 edge = successors[0] assert isinstance(edge, KCFG.Edge) - assert proof.is_terminal(edge.target.id) - claims.append(edge.to_rule(f'{opcode}-SUMMARY', claim = True)) + assert proof.is_terminal(edge.target.id) or proof.kcfg.is_covered(edge.target.id) + claims.append(edge.to_rule(f'{opcode}-SUMMARY', claim=True)) + print(f'[{opcode}] Edge {edge.source.id} -> {edge.target.id} is terminal or covered') else: assert len(proof.kcfg.edges()) == 1 assert isinstance(successor, KCFG.Edge) - assert proof.is_terminal(successor.target.id) - claims.append(successor.to_rule(f'{opcode}-SUMMARY', claim = True)) - + assert proof.is_terminal(successor.target.id) or proof.kcfg.is_covered(successor.target.id) + claims.append(successor.to_rule(f'{opcode}-SUMMARY', claim=True)) + print(f'[{opcode}] Edge {successor.source.id} -> {successor.target.id} is terminal or covered') + # prove the correctness of the summary for claim in claims: - assert summarizer.explore(APRProof.from_claim(summarizer.kevm.definition, claim, {}, summarizer.proof_dir)) + print(f'[{opcode}] Proving claim {claim.att.get(Atts.LABEL)}') + assert summarizer.explore( + APRProof.from_claim(summarizer.kevm.definition, claim, {}, summarizer.proof_dir) + ) + print(f'[{opcode}] Claim {claim.att.get(Atts.LABEL)} proved') else: pytest.skip(f'{opcode} ignored')