From af59669a5abb26314d11bb2c10dad5fad4a5ac9d Mon Sep 17 00:00:00 2001 From: Paschal Amusuo Date: Sat, 15 Nov 2025 20:14:22 -0500 Subject: [PATCH 1/5] Added the coverage analyzer tool --- agent/coverage_analyzer.py | 43 +++++++++++- agent_tests/coverage_analyzer_test.py | 49 ++++++++++++++ experiment/textcov.py | 75 +++++++++++++++++++++ experiment/workdir.py | 13 ++++ llm_toolkit/prompt_builder.py | 4 +- prompts/agent/coverage-analyzer-priming.txt | 4 +- prompts/tool/coverage_tool.txt | 34 ++++++++++ tool/coverage_tool.py | 35 ++++++++++ 8 files changed, 251 insertions(+), 6 deletions(-) create mode 100644 agent_tests/coverage_analyzer_test.py create mode 100644 prompts/tool/coverage_tool.txt create mode 100644 tool/coverage_tool.py diff --git a/agent/coverage_analyzer.py b/agent/coverage_analyzer.py index fa2adb575e..7364f5667f 100644 --- a/agent/coverage_analyzer.py +++ b/agent/coverage_analyzer.py @@ -25,6 +25,7 @@ from llm_toolkit.prompts import Prompt from results import AnalysisResult, CoverageResult, Result, RunResult from tool.container_tool import ProjectContainerTool +from tool.coverage_tool import CoverageTool INVALID_PRMOT_PATH = os.path.join('prompts', 'agent', 'coverage-analyzer-invalid-response.txt') @@ -49,7 +50,8 @@ def _initial_prompt(self, results: list[Result]) -> Prompt: builder = CoverageAnalyzerTemplateBuilder(self.llm, benchmark, last_result) prompt = builder.build(example_pair=[], - tool_guides=self.inspect_tool.tutorial(), + tool_guides=[self.inspect_tool.tutorial(), + self.coverage_tool.tutorial()], project_dir=self.inspect_tool.project_dir, function_requirements=function_requirements) # TODO: A different file name/dir. @@ -75,6 +77,26 @@ def _container_handle_conclusion(self, cur_round: int, response: str, return None + def _container_handle_coverage_request(self, response: str, tool: CoverageTool, + prompt: Prompt) -> Prompt: + """Executes coverage tool commands from LLM response.""" + command = self._parse_tag(response, 'coverage-request') + if not command: + return prompt + + logger.info('Executing coverage command: %s', command, trial=self.trial) + coverage_report = tool.execute(command) or 'No coverage report found.' + tool_result = ( + '' + f'{command}' + '' + '' + f'{coverage_report}' + '' + ) + prompt.append(tool_result) + return prompt + def _container_tool_reaction( self, cur_round: int, response: str, run_result: RunResult, coverage_result: CoverageResult) -> Optional[Prompt]: @@ -82,8 +104,12 @@ def _container_tool_reaction( del run_result prompt = prompt_builder.DefaultTemplateBuilder(self.llm, None).build([]) - prompt = self._container_handle_bash_commands(response, self.inspect_tool, + if self._parse_tags(response, 'bash'): + prompt = self._container_handle_bash_commands(response, self.inspect_tool, prompt) + elif self._parse_tags(response, 'coverage-request'): + prompt = self._container_handle_coverage_request( + response, self.coverage_tool, prompt) # Only report conclusion when no more bash investigation is required. if not prompt.gettext(): # Then build fuzz target. @@ -120,6 +146,19 @@ def execute(self, result_history: list[Result]) -> AnalysisResult: content=last_result.build_script_source, file_path=self.inspect_tool.build_script_path) self.inspect_tool.compile(extra_commands=' && rm -rf /out/* > /dev/null') + logger.info('This is trial %d, container id: %s', + last_result.trial, + self.inspect_tool.container_id, + trial=last_result.trial) + textcov_report = self.args.work_dirs.textcov_report( + last_result.trial) + if not textcov_report or not os.path.exists(textcov_report): + logger.error('Textcov report not found at %s', + textcov_report, + trial=last_result.trial) + + self.coverage_tool = CoverageTool(benchmark, + textcov_report) cur_round = 1 coverage_result = CoverageResult() prompt = self._initial_prompt(result_history) diff --git a/agent_tests/coverage_analyzer_test.py b/agent_tests/coverage_analyzer_test.py new file mode 100644 index 0000000000..16907cd42a --- /dev/null +++ b/agent_tests/coverage_analyzer_test.py @@ -0,0 +1,49 @@ +import os +from agent_tests.base_agent_test import BaseAgentTest +from results import CoverageResult, RunResult + + +class CoverageAnalyzerAgentTest(BaseAgentTest): + """Test for the CoverageAnalyzer agent.""" + + def setup_initial_result_list(self, benchmark, prompt): + """Sets up the initial result list for the CoverageAnalyzer agent test.""" + + # Get necessary data from prompt + fuzz_target_source = self._parse_tag(prompt, 'fuzz target') + fuzzing_log = self._parse_tag(prompt, 'fuzzing log') + function_requirements = self._parse_tag(prompt, 'function-requirements') + + if function_requirements: + # Save to requirements file + self.write_requirements_to_file(self.args, function_requirements) + + # Walk through the directory to find coverage report files + covreports = [] + for root, dirs, files in os.walk(self.args.additional_files_path): + for file in files: + file_path = os.path.join(root, file) + if file.endswith('.covreport'): + covreports.append(file_path) + + if covreports: + textcov_dir = os.path.join(self.args.work_dirs.code_coverage_report( + f'{self.trial:02d}.fuzz-target'), 'textcov') + + os.makedirs(textcov_dir, exist_ok=True) + dst_file_path = os.path.join(textcov_dir, os.path.basename(covreports[0])) + + with open(covreports[0], 'rb') as file: + with open(dst_file_path, 'wb') as dst_file: + dst_file.write(file.read()) + + run_result = RunResult(benchmark=benchmark, + trial=self.trial, + work_dirs=self.args.work_dirs, + author=None, + chat_history={}, + crashes=False, + fuzz_target_source=fuzz_target_source, + run_log=fuzzing_log) + + return [run_result] \ No newline at end of file diff --git a/experiment/textcov.py b/experiment/textcov.py index 0fc2f531ab..c8eda68e5b 100644 --- a/experiment/textcov.py +++ b/experiment/textcov.py @@ -140,6 +140,14 @@ def subtract_covered_lines(self, other: Function, language: str = 'c++'): if line.hit_count and line.contents in self.lines: del self.lines[line.contents] +@dataclasses.dataclass +class RawFunctionReport: + """Represents a function report in a textcov.""" + name: str = '' + lines: list[str] = dataclasses.field(default_factory=list) + + def append_line(self, line: str): + self.lines.append(line) @dataclasses.dataclass class File: @@ -176,6 +184,8 @@ class Textcov: # Function name -> Function object. # For JVM / C / C++ / Rust functions: dict[str, Function] = dataclasses.field(default_factory=dict) + # Normalized function name -> extracted coverage reports for function. + raw_coverage_report: dict[str, RawFunctionReport] = dataclasses.field(default_factory=dict) # File name -> File object. # For Python files: dict[str, File] = dataclasses.field(default_factory=dict) @@ -265,6 +275,58 @@ def from_file( continue return textcov + + @classmethod + def from_file_raw( + cls, + file_handle, + ignore_function_patterns: Optional[List[re.Pattern]] = None) -> Textcov: + """Read a textcov from a file handle.""" + if ignore_function_patterns is None: + ignore_function_patterns = [] + + textcov = cls() + textcov.language = 'c++' + + current_function_name: str = '' + current_function: RawFunctionReport = RawFunctionReport() + try: + demangled = demangle(cls._read_file_with_fallback(file_handle)) + except Exception as e: + logger.warning('Decoding failure: %s', e) + demangled = '' + + for line in demangled.split('\n'): + match = FUNCTION_PATTERN.match(line) + if match: + # Normalize templates. + current_function_name = normalize_template_args(match.group(1)) + if any( + p.match(current_function_name) for p in ignore_function_patterns): + # Ignore this function. + current_function_name = '' + continue + + if current_function_name in textcov.raw_coverage_report: + current_function = textcov.raw_coverage_report[current_function_name] + else: + current_function = RawFunctionReport(name=current_function_name) + textcov.raw_coverage_report[current_function_name] = current_function + + continue + + if not current_function_name: + # No current functions. This can happen if we're currently in an + # ignored function. + continue + + match = LINE_PATTERN.match(line) + if match: + current_function.append_line(line) + continue + + return textcov + @classmethod def from_python_file(cls, file_handle) -> Textcov: """Read a textcov from a all_cov.json file for python.""" @@ -494,6 +556,19 @@ def subtract_covered_lines(self, other: Textcov): self.functions[function.name].subtract_covered_lines( function, self.language) + def get_coverage_reports(self, function_name: str) -> str: + """Get raw coverage report for a function.""" + + coverage_reports = [] + + for function in self.raw_coverage_report.keys(): + if function_name in function: + report = self.raw_coverage_report[function] + coverage_text = report.name + ':\n' + '\n'.join(report.lines) + coverage_reports.append(coverage_text) + + return '\n\n'.join(coverage_reports) + @property def covered_lines(self): if self.language == 'python': diff --git a/experiment/workdir.py b/experiment/workdir.py index 8327809ea1..d92137c22e 100644 --- a/experiment/workdir.py +++ b/experiment/workdir.py @@ -20,6 +20,8 @@ from shutil import rmtree from typing import Optional +import logger + class WorkDirs: """Working directories.""" @@ -80,6 +82,17 @@ def code_coverage_report(self, benchmark) -> str: benchmark_coverage = os.path.join(coverage_dir, benchmark) return benchmark_coverage + def textcov_report(self, trial: int) -> str: + code_coverage_report_dir = self.code_coverage_report(f'{trial:02d}.fuzz_target') + textcov_dir = os.path.join(code_coverage_report_dir, 'textcov') + logger.info('Looking for textcov report in %s', textcov_dir, trial=trial) + if not os.path.exists(textcov_dir): + return '' + for filename in os.listdir(textcov_dir): + if filename.endswith(".covreport"): + return os.path.join(textcov_dir, filename) + return '' + @property def status(self) -> str: return os.path.join(self._base_dir, 'status') diff --git a/llm_toolkit/prompt_builder.py b/llm_toolkit/prompt_builder.py index 3319444453..864b1a8599 100644 --- a/llm_toolkit/prompt_builder.py +++ b/llm_toolkit/prompt_builder.py @@ -691,7 +691,7 @@ def build(self, example_pair: list[list[str]], project_example_content: Optional[list[list[str]]] = None, project_context_content: Optional[dict] = None, - tool_guides: str = '', + tool_guides: list[str] = [], project_dir: str = '', function_requirements: str = '') -> prompts.Prompt: """Constructs a prompt using the templates in |self| and saves it.""" @@ -707,7 +707,7 @@ def build(self, prompt = prompt.replace('{FUNCTION_SIGNATURE}', self.benchmark.function_signature) prompt = prompt.replace('{FUZZ_TARGET}', self.run_result.fuzz_target_source) - prompt = prompt.replace('{TOOL_GUIDES}', tool_guides) + prompt = prompt.replace('{TOOL_GUIDES}', '\n'.join(tool_guides)) prompt = prompt.replace('{FUZZING_LOG}', self.run_result.run_log) prompt = prompt.replace('{FUNCTION_REQUIREMENTS}', function_requirements) diff --git a/prompts/agent/coverage-analyzer-priming.txt b/prompts/agent/coverage-analyzer-priming.txt index 0c8f409726..35985730ac 100644 --- a/prompts/agent/coverage-analyzer-priming.txt +++ b/prompts/agent/coverage-analyzer-priming.txt @@ -1,8 +1,8 @@ You are a professional cybersecurity expert researcher aiming to analyze a fuzz target to cover more code of the function-under-test. The fuzz target is written in {LANGUAGE}, designed to fuzz function {FUNCTION_SIGNATURE} in project {PROJECT}. The project source code is at {PROJECT_DIR}, mainly written in {PROJECT_LANGUAGE}. Here is the fuzz target and the fuzzing log. -Your task is to investigate why it has a low coverage, identify uncovered blocks of code that is reachable from the target function but cannot be covered by the existing fuzz target, and determine how to enhance the fuzz target to increase code coverage in the target function and other functions it calls. -Note the fuzz target can already build, but has a low coverage at runtime. +Your task is to analyze the coverage results of the function-under-test, identify uncovered blocks of code that is reachable from the target function but cannot be covered by the existing fuzz target, and determine how to enhance the fuzz target to increase code coverage in the target function and other functions it calls. +Note that you should not propose to cover a specific function by adding a direct call to that function in the fuzz target. The coverage must be increased when the fuzz target calls the target function. {FUZZ_TARGET} diff --git a/prompts/tool/coverage_tool.txt b/prompts/tool/coverage_tool.txt new file mode 100644 index 0000000000..e45c8b9c37 --- /dev/null +++ b/prompts/tool/coverage_tool.txt @@ -0,0 +1,34 @@ + +**Coverage tool Guide** +Use the coverage tool to access the coverage report for specific functions, produced during fuzzing. +This will help you understand the specific lines of code that are covered or not covered by the fuzz target. + + +1. STRICTLY Only One function name per message: + * Send only the function name. DO NOT append parenthesis or include the function's signature. + * **DO NOT** send multiple functions in each message. +2. Execute Request Message Structure: + * Reason for the Request: + * Explain the reason for requesting coverage report for the specific function. + * Wrap this explanation within and tags. + * Coverage Request: + * Provide the name of the function you need its coverage report. + * Wrap the request with and tags. + * Format Example: + + I want to retrieve the coverage report for the function_name function, so I can understand what lines are not yet covered. + + + function_name + +3. Receiving Bash Command Output Message Structure: + * Bash execution outputs will be returned in the following format: + + [The function you requested.] + + + [Extracted snippet containing the function's coverage report.] + + + + diff --git a/tool/coverage_tool.py b/tool/coverage_tool.py new file mode 100644 index 0000000000..97078a8749 --- /dev/null +++ b/tool/coverage_tool.py @@ -0,0 +1,35 @@ + + +import os +from typing import Any +import logging + +from experiment.benchmark import Benchmark +from experiment.textcov import Textcov +from tool.base_tool import BaseTool + + +logger = logging.getLogger(__name__) + +class CoverageTool(BaseTool): + """A tool that provides LLM agents access to code coverage reports.""" + + def __init__(self, + benchmark: Benchmark, + coverage_report_path: str) -> None: + super().__init__(benchmark) + if coverage_report_path and os.path.exists(coverage_report_path): + with open(coverage_report_path, 'rb') as file: + self.coverage_report = Textcov.from_file_raw(file) + else: + self.coverage_report = None + + def tutorial(self) -> str: + """Constructs a tool guide tutorial for LLM agents.""" + return self._get_tutorial_file_content('coverage_tool.txt') + + def execute(self, command: str) -> Any: + """Executes the coverage tool based on the command.""" + if not self.coverage_report: + return 'Coverage report not available.' + return self.coverage_report.get_coverage_reports(command) \ No newline at end of file From 8cf9129d979293d619f545de38e978871d3e2301 Mon Sep 17 00:00:00 2001 From: Paschal Amusuo Date: Sat, 15 Nov 2025 21:01:16 -0500 Subject: [PATCH 2/5] Fixed presubmit errors --- agent/base_agent.py | 1 - agent/coverage_analyzer.py | 42 +++++++++++++-------------- agent_tests/coverage_analyzer_test.py | 41 ++++++++++++++++++-------- experiment/textcov.py | 11 +++---- experiment/workdir.py | 7 +++-- llm_toolkit/prompt_builder.py | 2 +- tool/coverage_tool.py | 26 ++++++++++++----- 7 files changed, 79 insertions(+), 51 deletions(-) diff --git a/agent/base_agent.py b/agent/base_agent.py index 6a083350fb..5d2d5927b3 100644 --- a/agent/base_agent.py +++ b/agent/base_agent.py @@ -14,7 +14,6 @@ """The abstract base class for LLM agents in stages.""" import argparse import asyncio -import json import os import random import re diff --git a/agent/coverage_analyzer.py b/agent/coverage_analyzer.py index 7364f5667f..0e4cde4b77 100644 --- a/agent/coverage_analyzer.py +++ b/agent/coverage_analyzer.py @@ -50,8 +50,10 @@ def _initial_prompt(self, results: list[Result]) -> Prompt: builder = CoverageAnalyzerTemplateBuilder(self.llm, benchmark, last_result) prompt = builder.build(example_pair=[], - tool_guides=[self.inspect_tool.tutorial(), - self.coverage_tool.tutorial()], + tool_guides=[ + self.inspect_tool.tutorial(), + self.coverage_tool.tutorial() + ], project_dir=self.inspect_tool.project_dir, function_requirements=function_requirements) # TODO: A different file name/dir. @@ -77,8 +79,9 @@ def _container_handle_conclusion(self, cur_round: int, response: str, return None - def _container_handle_coverage_request(self, response: str, tool: CoverageTool, - prompt: Prompt) -> Prompt: + def _container_handle_coverage_request(self, response: str, + tool: CoverageTool, + prompt: Prompt) -> Prompt: """Executes coverage tool commands from LLM response.""" command = self._parse_tag(response, 'coverage-request') if not command: @@ -86,14 +89,12 @@ def _container_handle_coverage_request(self, response: str, tool: CoverageTool, logger.info('Executing coverage command: %s', command, trial=self.trial) coverage_report = tool.execute(command) or 'No coverage report found.' - tool_result = ( - '' - f'{command}' - '' - '' - f'{coverage_report}' - '' - ) + tool_result = ('' + f'{command}' + '' + '' + f'{coverage_report}' + '') prompt.append(tool_result) return prompt @@ -104,12 +105,13 @@ def _container_tool_reaction( del run_result prompt = prompt_builder.DefaultTemplateBuilder(self.llm, None).build([]) - if self._parse_tags(response, 'bash'): + if self._parse_tags(response, 'bash'): prompt = self._container_handle_bash_commands(response, self.inspect_tool, - prompt) + prompt) elif self._parse_tags(response, 'coverage-request'): - prompt = self._container_handle_coverage_request( - response, self.coverage_tool, prompt) + prompt = self._container_handle_coverage_request(response, + self.coverage_tool, + prompt) # Only report conclusion when no more bash investigation is required. if not prompt.gettext(): # Then build fuzz target. @@ -150,15 +152,13 @@ def execute(self, result_history: list[Result]) -> AnalysisResult: last_result.trial, self.inspect_tool.container_id, trial=last_result.trial) - textcov_report = self.args.work_dirs.textcov_report( - last_result.trial) + textcov_report = self.args.work_dirs.textcov_report(last_result.trial) if not textcov_report or not os.path.exists(textcov_report): logger.error('Textcov report not found at %s', textcov_report, trial=last_result.trial) - - self.coverage_tool = CoverageTool(benchmark, - textcov_report) + + self.coverage_tool = CoverageTool(benchmark, textcov_report) cur_round = 1 coverage_result = CoverageResult() prompt = self._initial_prompt(result_history) diff --git a/agent_tests/coverage_analyzer_test.py b/agent_tests/coverage_analyzer_test.py index 16907cd42a..278afe1205 100644 --- a/agent_tests/coverage_analyzer_test.py +++ b/agent_tests/coverage_analyzer_test.py @@ -1,4 +1,20 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Class for executing CoverageAnalyzer agent directly.""" + import os + from agent_tests.base_agent_test import BaseAgentTest from results import CoverageResult, RunResult @@ -27,9 +43,10 @@ def setup_initial_result_list(self, benchmark, prompt): covreports.append(file_path) if covreports: - textcov_dir = os.path.join(self.args.work_dirs.code_coverage_report( - f'{self.trial:02d}.fuzz-target'), 'textcov') - + textcov_dir = os.path.join( + self.args.work_dirs.code_coverage_report( + f'{self.trial:02d}.fuzz-target'), 'textcov') + os.makedirs(textcov_dir, exist_ok=True) dst_file_path = os.path.join(textcov_dir, os.path.basename(covreports[0])) @@ -38,12 +55,12 @@ def setup_initial_result_list(self, benchmark, prompt): dst_file.write(file.read()) run_result = RunResult(benchmark=benchmark, - trial=self.trial, - work_dirs=self.args.work_dirs, - author=None, - chat_history={}, - crashes=False, - fuzz_target_source=fuzz_target_source, - run_log=fuzzing_log) - - return [run_result] \ No newline at end of file + trial=self.trial, + work_dirs=self.args.work_dirs, + author=None, + chat_history={}, + crashes=False, + fuzz_target_source=fuzz_target_source, + run_log=fuzzing_log) + + return [run_result] diff --git a/experiment/textcov.py b/experiment/textcov.py index c8eda68e5b..3a6a51ec20 100644 --- a/experiment/textcov.py +++ b/experiment/textcov.py @@ -140,6 +140,7 @@ def subtract_covered_lines(self, other: Function, language: str = 'c++'): if line.hit_count and line.contents in self.lines: del self.lines[line.contents] + @dataclasses.dataclass class RawFunctionReport: """Represents a function report in a textcov.""" @@ -149,6 +150,7 @@ class RawFunctionReport: def append_line(self, line: str): self.lines.append(line) + @dataclasses.dataclass class File: """Represents a file in a textcov, only for Python.""" @@ -185,7 +187,8 @@ class Textcov: # For JVM / C / C++ / Rust functions: dict[str, Function] = dataclasses.field(default_factory=dict) # Normalized function name -> extracted coverage reports for function. - raw_coverage_report: dict[str, RawFunctionReport] = dataclasses.field(default_factory=dict) + raw_coverage_report: dict[str, RawFunctionReport] = dataclasses.field( + default_factory=dict) # File name -> File object. # For Python files: dict[str, File] = dataclasses.field(default_factory=dict) @@ -275,7 +278,6 @@ def from_file( continue return textcov - @classmethod def from_file_raw( cls, @@ -560,10 +562,9 @@ def get_coverage_reports(self, function_name: str) -> str: """Get raw coverage report for a function.""" coverage_reports = [] - - for function in self.raw_coverage_report.keys(): + + for function, report in self.raw_coverage_report.items(): if function_name in function: - report = self.raw_coverage_report[function] coverage_text = report.name + ':\n' + '\n'.join(report.lines) coverage_reports.append(coverage_text) diff --git a/experiment/workdir.py b/experiment/workdir.py index d92137c22e..d1d1f74ff8 100644 --- a/experiment/workdir.py +++ b/experiment/workdir.py @@ -83,14 +83,15 @@ def code_coverage_report(self, benchmark) -> str: return benchmark_coverage def textcov_report(self, trial: int) -> str: - code_coverage_report_dir = self.code_coverage_report(f'{trial:02d}.fuzz_target') + code_coverage_report_dir = self.code_coverage_report( + f'{trial:02d}.fuzz_target') textcov_dir = os.path.join(code_coverage_report_dir, 'textcov') logger.info('Looking for textcov report in %s', textcov_dir, trial=trial) if not os.path.exists(textcov_dir): return '' for filename in os.listdir(textcov_dir): - if filename.endswith(".covreport"): - return os.path.join(textcov_dir, filename) + if filename.endswith(".covreport"): + return os.path.join(textcov_dir, filename) return '' @property diff --git a/llm_toolkit/prompt_builder.py b/llm_toolkit/prompt_builder.py index 864b1a8599..e357ba1ba4 100644 --- a/llm_toolkit/prompt_builder.py +++ b/llm_toolkit/prompt_builder.py @@ -248,7 +248,7 @@ def _add_examples(self, example_content: Optional[list[list[str]]] = None): """Constructs the |example_files| to be used in the prompt.""" # Estimate prompt size so far. - prompt_size = self._model.estimate_token_num(self._prompt.get()) + prompt_size = self._model.estimate_token_num(self._prompt.gettext()) # Estimate space needed for the final problem. final_problem_prompt = self._prompt.create_prompt_piece( final_problem, 'user') diff --git a/tool/coverage_tool.py b/tool/coverage_tool.py index 97078a8749..33781fc3f3 100644 --- a/tool/coverage_tool.py +++ b/tool/coverage_tool.py @@ -1,22 +1,32 @@ - - +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A tool for LLM agents to interact with coverage reports.""" +import logging import os from typing import Any -import logging from experiment.benchmark import Benchmark from experiment.textcov import Textcov from tool.base_tool import BaseTool - logger = logging.getLogger(__name__) + class CoverageTool(BaseTool): """A tool that provides LLM agents access to code coverage reports.""" - def __init__(self, - benchmark: Benchmark, - coverage_report_path: str) -> None: + def __init__(self, benchmark: Benchmark, coverage_report_path: str) -> None: super().__init__(benchmark) if coverage_report_path and os.path.exists(coverage_report_path): with open(coverage_report_path, 'rb') as file: @@ -32,4 +42,4 @@ def execute(self, command: str) -> Any: """Executes the coverage tool based on the command.""" if not self.coverage_report: return 'Coverage report not available.' - return self.coverage_report.get_coverage_reports(command) \ No newline at end of file + return self.coverage_report.get_coverage_reports(command) From e0ba10abf762915abe88d4b994911011d5ff6157 Mon Sep 17 00:00:00 2001 From: Paschal Amusuo Date: Sat, 15 Nov 2025 22:29:48 -0500 Subject: [PATCH 3/5] Presubmit fixes --- agent/coverage_analyzer.py | 6 ++---- llm_toolkit/prompt_builder.py | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/agent/coverage_analyzer.py b/agent/coverage_analyzer.py index 0e4cde4b77..1cce15784c 100644 --- a/agent/coverage_analyzer.py +++ b/agent/coverage_analyzer.py @@ -49,11 +49,9 @@ def _initial_prompt(self, results: list[Result]) -> Prompt: function_requirements = self.get_function_requirements() builder = CoverageAnalyzerTemplateBuilder(self.llm, benchmark, last_result) + tool_guides = f'{self.inspect_tool.tutorial()}\n\n{self.coverage_tool.tutorial()}' prompt = builder.build(example_pair=[], - tool_guides=[ - self.inspect_tool.tutorial(), - self.coverage_tool.tutorial() - ], + tool_guides=tool_guides, project_dir=self.inspect_tool.project_dir, function_requirements=function_requirements) # TODO: A different file name/dir. diff --git a/llm_toolkit/prompt_builder.py b/llm_toolkit/prompt_builder.py index e357ba1ba4..c24678cced 100644 --- a/llm_toolkit/prompt_builder.py +++ b/llm_toolkit/prompt_builder.py @@ -691,7 +691,7 @@ def build(self, example_pair: list[list[str]], project_example_content: Optional[list[list[str]]] = None, project_context_content: Optional[dict] = None, - tool_guides: list[str] = [], + tool_guides: str = '', project_dir: str = '', function_requirements: str = '') -> prompts.Prompt: """Constructs a prompt using the templates in |self| and saves it.""" @@ -707,7 +707,7 @@ def build(self, prompt = prompt.replace('{FUNCTION_SIGNATURE}', self.benchmark.function_signature) prompt = prompt.replace('{FUZZ_TARGET}', self.run_result.fuzz_target_source) - prompt = prompt.replace('{TOOL_GUIDES}', '\n'.join(tool_guides)) + prompt = prompt.replace('{TOOL_GUIDES}', tool_guides) prompt = prompt.replace('{FUZZING_LOG}', self.run_result.run_log) prompt = prompt.replace('{FUNCTION_REQUIREMENTS}', function_requirements) From 478948ad6482e3b858836e01109cb3d68d50d281 Mon Sep 17 00:00:00 2001 From: Paschal Amusuo Date: Sun, 16 Nov 2025 22:27:04 -0500 Subject: [PATCH 4/5] Presubmit fixes --- prompts/tool/coverage_tool.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prompts/tool/coverage_tool.txt b/prompts/tool/coverage_tool.txt index e45c8b9c37..1789488e1d 100644 --- a/prompts/tool/coverage_tool.txt +++ b/prompts/tool/coverage_tool.txt @@ -1,6 +1,6 @@ **Coverage tool Guide** -Use the coverage tool to access the coverage report for specific functions, produced during fuzzing. +Use the coverage tool to access the coverage report for specific functions, produced during fuzzing. This will help you understand the specific lines of code that are covered or not covered by the fuzz target. From 8d5120d155756737d2f5c96266ae8dd2817828ec Mon Sep 17 00:00:00 2001 From: Paschal Amusuo Date: Mon, 17 Nov 2025 10:09:46 -0500 Subject: [PATCH 5/5] Fix docker version issue --- common/cloud_builder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/cloud_builder.py b/common/cloud_builder.py index 58719f1d19..9d1e431dd2 100644 --- a/common/cloud_builder.py +++ b/common/cloud_builder.py @@ -348,6 +348,8 @@ def _request_cloud_build(self, ofg_repo_url: str, agent_dill_url: str, 'GOOGLE_CLOUD_LOCATION=' + os.getenv("GOOGLE_CLOUD_LOCATION", "global"), '--network=cloudbuild', + '-e', + 'DOCKER_API_VERSION=1.41', # Built from this repo's `Dockerfile.cloudbuild-agent`. ('us-central1-docker.pkg.dev/oss-fuzz/oss-fuzz-gen/' 'agent-image'),