diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 948fdaf..51241be 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -46,7 +46,7 @@ jobs: python -m pip install . - name: Test run: | - python -m pytest + PYTHONPATH=. python -m pytest release-linux: if: github.event_name == 'push' && contains(github.ref, 'refs/tags/') diff --git a/scripts/run-etcd.sh b/scripts/run-etcd.sh new file mode 100644 index 0000000..69f83cf --- /dev/null +++ b/scripts/run-etcd.sh @@ -0,0 +1,23 @@ +ETCD_VER=v3.5.14 + +rm -rf /tmp/etcd-data.tmp && mkdir -p /tmp/etcd-data.tmp && \ + docker run -d \ + -p 2379:2379 \ + -p 2380:2380 \ + --mount type=bind,source=/tmp/etcd-data.tmp,destination=/etcd-data \ + --name etcd-gcr-${ETCD_VER} \ + gcr.io/etcd-development/etcd:${ETCD_VER} \ + /usr/local/bin/etcd \ + --name s1 \ + --data-dir /etcd-data \ + --listen-client-urls http://0.0.0.0:2379 \ + --advertise-client-urls http://0.0.0.0:2379 \ + --listen-peer-urls http://0.0.0.0:2380 \ + --initial-advertise-peer-urls http://0.0.0.0:2380 \ + --initial-cluster s1=http://0.0.0.0:2380 \ + --initial-cluster-token tkn \ + --initial-cluster-state new \ + --log-level info \ + --logger zap \ + --log-outputs stderr + diff --git a/src/client.rs b/src/client.rs index 54b1b9c..7ccdbb3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -116,6 +116,7 @@ impl PyClient { result } + #[pyo3(signature = ())] fn __aenter__<'a>(&'a mut self, py: Python<'a>) -> PyResult> { let endpoints = self.endpoints.clone(); let connect_options = self.connect_options.clone(); diff --git a/tests/conftest.py b/tests/conftest.py index 9edccaa..70f69a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,15 +29,16 @@ def etcd_container(): with DockerContainer( f"gcr.io/etcd-development/etcd:{ETCD_VER}", command=_etcd_command, - ).with_bind_ports("2379/tcp", 2379) as container: + ).with_exposed_ports(2379) as container: wait_for_logs(container, "ready to serve client requests") - yield + yield container @pytest.fixture async def etcd(etcd_container): + etcd_port = etcd_container.get_exposed_port(2379) etcd = AsyncEtcd( - addr=HostPortPair(host="127.0.0.1", port=2379), + addr=HostPortPair(host="127.0.0.1", port=etcd_port), namespace="test", scope_prefix_map={ ConfigScopes.GLOBAL: "global", @@ -45,31 +46,31 @@ async def etcd(etcd_container): ConfigScopes.NODE: "node/i-test", }, ) - try: - await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) - await etcd.delete_prefix("", scope=ConfigScopes.SGROUP) - await etcd.delete_prefix("", scope=ConfigScopes.NODE) - yield etcd - finally: - await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) - await etcd.delete_prefix("", scope=ConfigScopes.SGROUP) - await etcd.delete_prefix("", scope=ConfigScopes.NODE) - await etcd.close() - del etcd + async with etcd: + try: + await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) + await etcd.delete_prefix("", scope=ConfigScopes.SGROUP) + await etcd.delete_prefix("", scope=ConfigScopes.NODE) + yield etcd + finally: + await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) + await etcd.delete_prefix("", scope=ConfigScopes.SGROUP) + await etcd.delete_prefix("", scope=ConfigScopes.NODE) @pytest.fixture async def gateway_etcd(etcd_container): + etcd_port = etcd_container.get_exposed_port(2379) etcd = AsyncEtcd( - addr=HostPortPair(host="127.0.0.1", port=2379), + addr=HostPortPair(host="127.0.0.1", port=etcd_port), namespace="test", scope_prefix_map={ ConfigScopes.GLOBAL: "", }, - ) - try: - await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) - yield etcd - finally: - await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) - del etcd + ) + async with etcd: + try: + await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) + yield etcd + finally: + await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL) diff --git a/tests/harness.py b/tests/harness.py index 1488a86..fce7155 100644 --- a/tests/harness.py +++ b/tests/harness.py @@ -9,6 +9,7 @@ import functools import logging from collections import ChainMap, namedtuple +from types import TracebackType from typing import ( AsyncGenerator, AsyncIterator, @@ -165,8 +166,24 @@ def __init__( connect_options=self._connect_options, ) - async def close(self): - pass # for backward compatibility + async def open(self) -> None: + await self.etcd.__aenter__() + + async def close(self) -> None: + await self.etcd.__aexit__() + + async def __aenter__(self) -> "AsyncEtcd": + await self.etcd.__aenter__() + return self + + async def __aexit__( + self, + exc_type: Optional[type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> Optional[bool]: + ret = await self.etcd.__aexit__(exc_type, exc_val, exc_tb) + return ret def _mangle_key(self, k: str) -> str: if k.startswith("/"): @@ -604,4 +621,4 @@ async def watch_prefix( await asyncio.sleep(self.watch_reconnect_intvl) ended_without_error = False else: - raise e \ No newline at end of file + raise e diff --git a/tests/test.py b/tests/test.py index 9ffe073..da24615 100644 --- a/tests/test.py +++ b/tests/test.py @@ -246,6 +246,68 @@ async def _record_prefix(): assert records_prefix[3].value == "" +@pytest.mark.asyncio +async def test_subprocess_segfault_reproduction(etcd_container) -> None: + """Test case to reproduce segfault when subprocess terminates quickly.""" + import subprocess + import sys + import tempfile + import os + + # Create a script that will be run in subprocess + script_content = ''' +import asyncio +import sys + +from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair + +async def main(etcd_port): + etcd = AsyncEtcd( + addr=HostPortPair(host="127.0.0.1", port=etcd_port), + namespace="test_subprocess", + scope_prefix_map={ + ConfigScopes.GLOBAL: "global", + }, + ) + + # Write a key and immediately exit + async with etcd: + await etcd.put("test_key", "test_value") + +if __name__ == "__main__": + etcd_port = int(sys.argv[1]) + asyncio.run(main(etcd_port)) +''' + etcd_port = etcd_container.get_exposed_port(2379) + + # Write the script to a temporary file + with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: + f.write(script_content) + script_path = f.name + + try: + # Run the subprocess 5 times to reproduce the segfault + for i in range(5): + result = subprocess.run( + [sys.executable, "-u", script_path, str(etcd_port)], + capture_output=True, + text=True, + timeout=10, + ) + + # Check if the subprocess completed successfully + if result.returncode != 0: + print(f"Subprocess {i+1} failed with return code {result.returncode}") + print(f"stderr: {result.stderr}") + print(f"stdout: {result.stdout}") + + assert result.returncode == 0, f"Subprocess {i+1} failed with return code {result.returncode}" + + finally: + # Clean up the temporary script file + os.unlink(script_path) + + @pytest.mark.asyncio async def test_watch_once(etcd: AsyncEtcd) -> None: records = []