Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion agent/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""The abstract base class for LLM agents in stages."""
import argparse
import asyncio
import json
import os
import random
import re
Expand Down
43 changes: 40 additions & 3 deletions agent/coverage_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from llm_toolkit.prompts import Prompt
from results import AnalysisResult, CoverageResult, Result, RunResult
from tool.container_tool import ProjectContainerTool
from tool.coverage_tool import CoverageTool

INVALID_PRMOT_PATH = os.path.join('prompts', 'agent',
'coverage-analyzer-invalid-response.txt')
Expand All @@ -48,8 +49,9 @@ def _initial_prompt(self, results: list[Result]) -> Prompt:
function_requirements = self.get_function_requirements()

builder = CoverageAnalyzerTemplateBuilder(self.llm, benchmark, last_result)
tool_guides = f'{self.inspect_tool.tutorial()}\n\n{self.coverage_tool.tutorial()}'
prompt = builder.build(example_pair=[],
tool_guides=self.inspect_tool.tutorial(),
tool_guides=tool_guides,
project_dir=self.inspect_tool.project_dir,
function_requirements=function_requirements)
# TODO: A different file name/dir.
Expand All @@ -75,15 +77,39 @@ def _container_handle_conclusion(self, cur_round: int, response: str,

return None

def _container_handle_coverage_request(self, response: str,
tool: CoverageTool,
prompt: Prompt) -> Prompt:
"""Executes coverage tool commands from LLM response."""
command = self._parse_tag(response, 'coverage-request')
if not command:
return prompt

logger.info('Executing coverage command: %s', command, trial=self.trial)
coverage_report = tool.execute(command) or 'No coverage report found.'
tool_result = ('<function>'
f'{command}'
'</function>'
'<report>'
f'{coverage_report}'
'</report>')
prompt.append(tool_result)
return prompt

def _container_tool_reaction(
self, cur_round: int, response: str, run_result: RunResult,
coverage_result: CoverageResult) -> Optional[Prompt]:
"""Validates LLM conclusion or executes its command."""
del run_result
prompt = prompt_builder.DefaultTemplateBuilder(self.llm, None).build([])

prompt = self._container_handle_bash_commands(response, self.inspect_tool,
prompt)
if self._parse_tags(response, 'bash'):
prompt = self._container_handle_bash_commands(response, self.inspect_tool,
prompt)
elif self._parse_tags(response, 'coverage-request'):
prompt = self._container_handle_coverage_request(response,
self.coverage_tool,
prompt)
# Only report conclusion when no more bash investigation is required.
if not prompt.gettext():
# Then build fuzz target.
Expand Down Expand Up @@ -120,6 +146,17 @@ def execute(self, result_history: list[Result]) -> AnalysisResult:
content=last_result.build_script_source,
file_path=self.inspect_tool.build_script_path)
self.inspect_tool.compile(extra_commands=' && rm -rf /out/* > /dev/null')
logger.info('This is trial %d, container id: %s',
last_result.trial,
self.inspect_tool.container_id,
trial=last_result.trial)
textcov_report = self.args.work_dirs.textcov_report(last_result.trial)
if not textcov_report or not os.path.exists(textcov_report):
logger.error('Textcov report not found at %s',
textcov_report,
trial=last_result.trial)

self.coverage_tool = CoverageTool(benchmark, textcov_report)
cur_round = 1
coverage_result = CoverageResult()
prompt = self._initial_prompt(result_history)
Expand Down
66 changes: 66 additions & 0 deletions agent_tests/coverage_analyzer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class for executing CoverageAnalyzer agent directly."""

import os

from agent_tests.base_agent_test import BaseAgentTest
from results import CoverageResult, RunResult


class CoverageAnalyzerAgentTest(BaseAgentTest):
"""Test for the CoverageAnalyzer agent."""

def setup_initial_result_list(self, benchmark, prompt):
"""Sets up the initial result list for the CoverageAnalyzer agent test."""

# Get necessary data from prompt
fuzz_target_source = self._parse_tag(prompt, 'fuzz target')
fuzzing_log = self._parse_tag(prompt, 'fuzzing log')
function_requirements = self._parse_tag(prompt, 'function-requirements')

if function_requirements:
# Save to requirements file
self.write_requirements_to_file(self.args, function_requirements)

# Walk through the directory to find coverage report files
covreports = []
for root, dirs, files in os.walk(self.args.additional_files_path):
for file in files:
file_path = os.path.join(root, file)
if file.endswith('.covreport'):
covreports.append(file_path)

if covreports:
textcov_dir = os.path.join(
self.args.work_dirs.code_coverage_report(
f'{self.trial:02d}.fuzz-target'), 'textcov')

os.makedirs(textcov_dir, exist_ok=True)
dst_file_path = os.path.join(textcov_dir, os.path.basename(covreports[0]))

with open(covreports[0], 'rb') as file:
with open(dst_file_path, 'wb') as dst_file:
dst_file.write(file.read())

run_result = RunResult(benchmark=benchmark,
trial=self.trial,
work_dirs=self.args.work_dirs,
author=None,
chat_history={},
crashes=False,
fuzz_target_source=fuzz_target_source,
run_log=fuzzing_log)

return [run_result]
2 changes: 2 additions & 0 deletions common/cloud_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ def _request_cloud_build(self, ofg_repo_url: str, agent_dill_url: str,
'GOOGLE_CLOUD_LOCATION=' +
os.getenv("GOOGLE_CLOUD_LOCATION", "global"),
'--network=cloudbuild',
'-e',
'DOCKER_API_VERSION=1.41',
# Built from this repo's `Dockerfile.cloudbuild-agent`.
('us-central1-docker.pkg.dev/oss-fuzz/oss-fuzz-gen/'
'agent-image'),
Expand Down
76 changes: 76 additions & 0 deletions experiment/textcov.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ def subtract_covered_lines(self, other: Function, language: str = 'c++'):
del self.lines[line.contents]


@dataclasses.dataclass
class RawFunctionReport:
"""Represents a function report in a textcov."""
name: str = ''
lines: list[str] = dataclasses.field(default_factory=list)

def append_line(self, line: str):
self.lines.append(line)


@dataclasses.dataclass
class File:
"""Represents a file in a textcov, only for Python."""
Expand Down Expand Up @@ -176,6 +186,9 @@ class Textcov:
# Function name -> Function object.
# For JVM / C / C++ / Rust
functions: dict[str, Function] = dataclasses.field(default_factory=dict)
# Normalized function name -> extracted coverage reports for function.
raw_coverage_report: dict[str, RawFunctionReport] = dataclasses.field(
default_factory=dict)
# File name -> File object.
# For Python
files: dict[str, File] = dataclasses.field(default_factory=dict)
Expand Down Expand Up @@ -265,6 +278,57 @@ def from_file(
continue
return textcov

@classmethod
def from_file_raw(
cls,
file_handle,
ignore_function_patterns: Optional[List[re.Pattern]] = None) -> Textcov:
"""Read a textcov from a file handle."""
if ignore_function_patterns is None:
ignore_function_patterns = []

textcov = cls()
textcov.language = 'c++'

current_function_name: str = ''
current_function: RawFunctionReport = RawFunctionReport()
try:
demangled = demangle(cls._read_file_with_fallback(file_handle))
except Exception as e:
logger.warning('Decoding failure: %s', e)
demangled = ''

for line in demangled.split('\n'):
match = FUNCTION_PATTERN.match(line)
if match:
# Normalize templates.
current_function_name = normalize_template_args(match.group(1))
if any(
p.match(current_function_name) for p in ignore_function_patterns):
# Ignore this function.
current_function_name = ''
continue

if current_function_name in textcov.raw_coverage_report:
current_function = textcov.raw_coverage_report[current_function_name]
else:
current_function = RawFunctionReport(name=current_function_name)
textcov.raw_coverage_report[current_function_name] = current_function

continue

if not current_function_name:
# No current functions. This can happen if we're currently in an
# ignored function.
continue

match = LINE_PATTERN.match(line)
if match:
current_function.append_line(line)
continue

return textcov

@classmethod
def from_python_file(cls, file_handle) -> Textcov:
"""Read a textcov from a all_cov.json file for python."""
Expand Down Expand Up @@ -494,6 +558,18 @@ def subtract_covered_lines(self, other: Textcov):
self.functions[function.name].subtract_covered_lines(
function, self.language)

def get_coverage_reports(self, function_name: str) -> str:
"""Get raw coverage report for a function."""

coverage_reports = []

for function, report in self.raw_coverage_report.items():
if function_name in function:
coverage_text = report.name + ':\n' + '\n'.join(report.lines)
coverage_reports.append(coverage_text)

return '\n\n'.join(coverage_reports)

@property
def covered_lines(self):
if self.language == 'python':
Expand Down
14 changes: 14 additions & 0 deletions experiment/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from shutil import rmtree
from typing import Optional

import logger


class WorkDirs:
"""Working directories."""
Expand Down Expand Up @@ -80,6 +82,18 @@ def code_coverage_report(self, benchmark) -> str:
benchmark_coverage = os.path.join(coverage_dir, benchmark)
return benchmark_coverage

def textcov_report(self, trial: int) -> str:
code_coverage_report_dir = self.code_coverage_report(
f'{trial:02d}.fuzz_target')
textcov_dir = os.path.join(code_coverage_report_dir, 'textcov')
logger.info('Looking for textcov report in %s', textcov_dir, trial=trial)
if not os.path.exists(textcov_dir):
return ''
for filename in os.listdir(textcov_dir):
if filename.endswith(".covreport"):
return os.path.join(textcov_dir, filename)
return ''

@property
def status(self) -> str:
return os.path.join(self._base_dir, 'status')
Expand Down
2 changes: 1 addition & 1 deletion llm_toolkit/prompt_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def _add_examples(self,
example_content: Optional[list[list[str]]] = None):
"""Constructs the |example_files| to be used in the prompt."""
# Estimate prompt size so far.
prompt_size = self._model.estimate_token_num(self._prompt.get())
prompt_size = self._model.estimate_token_num(self._prompt.gettext())
# Estimate space needed for the final problem.
final_problem_prompt = self._prompt.create_prompt_piece(
final_problem, 'user')
Expand Down
4 changes: 2 additions & 2 deletions prompts/agent/coverage-analyzer-priming.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
You are a professional cybersecurity expert researcher aiming to analyze a fuzz target to cover more code of the function-under-test.
The fuzz target is written in {LANGUAGE}, designed to fuzz function {FUNCTION_SIGNATURE} in project {PROJECT}. The project source code is at {PROJECT_DIR}, mainly written in {PROJECT_LANGUAGE}.
Here is the fuzz target and the fuzzing log.
Your task is to investigate why it has a low coverage, identify uncovered blocks of code that is reachable from the target function but cannot be covered by the existing fuzz target, and determine how to enhance the fuzz target to increase code coverage in the target function and other functions it calls.
Note the fuzz target can already build, but has a low coverage at runtime.
Your task is to analyze the coverage results of the function-under-test, identify uncovered blocks of code that is reachable from the target function but cannot be covered by the existing fuzz target, and determine how to enhance the fuzz target to increase code coverage in the target function and other functions it calls.
Note that you should not propose to cover a specific function by adding a direct call to that function in the fuzz target. The coverage must be increased when the fuzz target calls the target function.

<fuzz target>
{FUZZ_TARGET}
Expand Down
34 changes: 34 additions & 0 deletions prompts/tool/coverage_tool.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<tool>
**Coverage tool Guide**
Use the coverage tool to access the coverage report for specific functions, produced during fuzzing.
This will help you understand the specific lines of code that are covered or not covered by the fuzz target.

<interaction protocols>
1. STRICTLY Only One function name per message:
* Send only the function name. DO NOT append parenthesis or include the function's signature.
* **DO NOT** send multiple functions in each message.
2. Execute Request Message Structure:
* Reason for the Request:
* Explain the reason for requesting coverage report for the specific function.
* Wrap this explanation within <reason> and </reason> tags.
* Coverage Request:
* Provide the name of the function you need its coverage report.
* Wrap the request with <coverage-request> and </coverage-request> tags.
* Format Example:
<reason>
I want to retrieve the coverage report for the function_name function, so I can understand what lines are not yet covered.
</reason>
<coverage-request>
function_name
</coverage-request>
3. Receiving Bash Command Output Message Structure:
* Bash execution outputs will be returned in the following format:
<function>
[The function you requested.]
</function>
<report>
[Extracted snippet containing the function's coverage report.]
</report>
<interaction protocols>

</tool>
45 changes: 45 additions & 0 deletions tool/coverage_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A tool for LLM agents to interact with coverage reports."""
import logging
import os
from typing import Any

from experiment.benchmark import Benchmark
from experiment.textcov import Textcov
from tool.base_tool import BaseTool

logger = logging.getLogger(__name__)


class CoverageTool(BaseTool):
"""A tool that provides LLM agents access to code coverage reports."""

def __init__(self, benchmark: Benchmark, coverage_report_path: str) -> None:
super().__init__(benchmark)
if coverage_report_path and os.path.exists(coverage_report_path):
with open(coverage_report_path, 'rb') as file:
self.coverage_report = Textcov.from_file_raw(file)
else:
self.coverage_report = None

def tutorial(self) -> str:
"""Constructs a tool guide tutorial for LLM agents."""
return self._get_tutorial_file_content('coverage_tool.txt')

def execute(self, command: str) -> Any:
"""Executes the coverage tool based on the command."""
if not self.coverage_report:
return 'Coverage report not available.'
return self.coverage_report.get_coverage_reports(command)