From 1ac1a796e0c298f2ee743e9da372bb9a69816da5 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 5 May 2026 16:17:22 +0000 Subject: [PATCH 1/9] Rewrite `kaggle kernels logs --follow` to use SSE log stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous `--follow` implementation polled the persisted log blob, which is only written at session end — so it never showed live output for in-progress notebooks. Switch to the midtier SSE proxy (mirroring how the editor/viewer FE consume logs), parsing `data:` events and stopping on the `STREAM_END` sentinel. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 137 +++++++++------ src/kaggle/cli.py | 4 +- tests/test_kernels_logs.py | 230 +++++++++++++++----------- 3 files changed, 228 insertions(+), 143 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index 4b4ef26f..725e7ca4 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -165,7 +165,7 @@ ApiKernelMetadata, ApiDeleteKernelRequest, ) -from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus, KernelsListSortType, KernelsListViewType +from kagglesdk.kernels.types.kernels_enums import KernelsListSortType, KernelsListViewType from kagglesdk.models.types.model_api_service import ( ApiListModelsRequest, ApiCreateModelRequest, @@ -201,7 +201,7 @@ from enum import EnumMeta from requests.exceptions import HTTPError from requests.models import Response -from typing import Callable, cast, Dict, List, Mapping, Optional, Tuple, Union, TypeVar, Iterable +from typing import Callable, cast, Dict, Iterator, List, Mapping, Optional, Tuple, Union, TypeVar, Iterable T = TypeVar("T") @@ -4596,16 +4596,7 @@ def kernels_logs(self, kernel: str) -> str: Returns: str: The log content from the kernel's latest session. """ - if kernel is None: - raise ValueError("A kernel must be specified") - if "/" in kernel: - self.validate_kernel_string(kernel) - kernel_url_list = kernel.split("/") - owner_slug = kernel_url_list[0] - kernel_slug = kernel_url_list[1] - else: - owner_slug = self.get_config_value(self.CONFIG_NAME_USER) or "" - kernel_slug = kernel + owner_slug, kernel_slug = self._split_kernel(kernel) with self.build_kaggle_client() as kaggle: request = ApiListKernelSessionOutputRequest() @@ -4623,52 +4614,100 @@ def kernels_logs(self, kernel: str) -> str: raise return response.log or "" - def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=5): + def _split_kernel(self, kernel: str) -> Tuple[str, str]: + """Split a kernel identifier into (owner_slug, kernel_slug).""" + if kernel is None: + raise ValueError("A kernel must be specified") + if "/" in kernel: + self.validate_kernel_string(kernel) + owner_slug, kernel_slug = kernel.split("/", 1) + return owner_slug, kernel_slug + owner_slug = self.get_config_value(self.CONFIG_NAME_USER) or "" + return owner_slug, kernel + + # Sentinel value emitted by the streaming endpoint to signal end-of-stream. + _LOG_STREAM_END_SENTINEL = "STREAM_END" + + def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: + """Stream live execution logs for a kernel via the midtier SSE proxy. + + The midtier endpoint (`GET /api/v1/kernels/{owner}/{slug}/logs/stream`) + resolves the kernel's active session, waits for the upstream LogsURL to + be published, and pipes the SSE feed back to us. Each event payload is + a JSON object with `stream_name`, `time`, and `data` keys; the server + terminates the stream by emitting a `STREAM_END` sentinel. + + Args: + kernel: The kernel identifier in the format owner/kernel-slug. + + Yields: + Dict[str, str]: Parsed event payloads. + """ + owner_slug, kernel_slug = self._split_kernel(kernel) + + with self.build_kaggle_client() as kaggle: + http = kaggle._http_client + http._init_session() + base = http._endpoint if http._env == KaggleEnv.PROD else f"{http._endpoint}/api" + url = f"{base}/v1/kernels/{owner_slug}/{kernel_slug}/logs/stream" + + headers = dict(http._session.headers) + headers["Accept"] = "text/event-stream" + headers.pop("Content-Type", None) + + try: + response = http._session.get(url, stream=True, headers=headers, auth=http._session.auth) + response.raise_for_status() + except HTTPError as e: + if e.response is not None and e.response.status_code in (401, 403): + raise ValueError( + f"Cannot stream logs for kernel '{kernel}' " + "(Permission 'kernels.get' was denied). " + "The most likely cause is a wrong kernel slug. " + "Use the slug from the notebook URL (kaggle.com/code/owner/KERNEL-SLUG)." + ) + raise + + try: + for raw_line in response.iter_lines(decode_unicode=True): + if not raw_line or not raw_line.startswith("data:"): + continue + payload = raw_line[len("data:") :].lstrip() + if payload == self._LOG_STREAM_END_SENTINEL: + return + try: + yield json.loads(payload) + except json.JSONDecodeError: + yield {"data": payload} + finally: + response.close() + + def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None): """Print kernel execution logs to stdout. + In one-shot mode (default) prints the persisted log blob for the + kernel's latest session. In `--follow` mode attaches to the midtier + SSE log stream and prints log lines as they are produced by the + running session, exiting when the server signals end-of-stream. + Args: kernel: The kernel for which to retrieve the logs. kernel_opt: An alternative option to providing a kernel. - follow: If True, continuously poll and print new log lines. - interval: Polling interval in seconds for follow mode (default 5). + follow: If True, attach to the live log stream. + interval: Deprecated; retained for CLI backwards compatibility. """ + del interval # No longer used; live streaming is push-based. kernel = kernel or kernel_opt - terminal_statuses = { - KernelWorkerStatus.COMPLETE, - KernelWorkerStatus.ERROR, - KernelWorkerStatus.CANCEL_ACKNOWLEDGED, - } - printed_lines = 0 - - while True: - log = self.kernels_logs(kernel) - lines = log.split("\n") if log else [] - if follow: - new_lines = lines[printed_lines:] - if new_lines: - print("\n".join(new_lines), flush=True) - printed_lines = len(lines) + if not follow: + print(self.kernels_logs(kernel)) + return - # Check if the kernel has reached a terminal status - try: - status_response = self.kernels_status(kernel) - status = status_response.status - except Exception: - break - if status in terminal_statuses: - # Fetch final logs one more time - log = self.kernels_logs(kernel) - lines = log.split("\n") if log else [] - final_new_lines = lines[printed_lines:] - if final_new_lines: - print("\n".join(final_new_lines), flush=True) - break - - time.sleep(interval) - else: - print(log) - break + for event in self.kernels_logs_stream(kernel): + data = event.get("data") + if data is None: + continue + print(data, flush=True, end="" if data.endswith("\n") else "\n") def model_get(self, model: str) -> ApiModel: """Gets a model. diff --git a/src/kaggle/cli.py b/src/kaggle/cli.py index 316e38e4..d00887b8 100644 --- a/src/kaggle/cli.py +++ b/src/kaggle/cli.py @@ -1960,8 +1960,8 @@ class Help(object): "Regex pattern to match against filenames. Only files matching the pattern will be downloaded." ) param_kernel_acc = "Specify the type of accelerator to use for the kernel run" - param_kernel_logs_follow = "Continuously poll and print new log lines (like tail -f)" - param_kernel_logs_interval = "Polling interval in seconds for follow mode (default 5)" + param_kernel_logs_follow = "Stream live execution logs from the running session (like tail -f)" + param_kernel_logs_interval = argparse.SUPPRESS # Deprecated; live streaming is push-based. # Models params param_model = "Model URL suffix in format /" diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 1431b8e6..8abf31d6 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -1,26 +1,41 @@ # coding=utf-8 -import unittest -from unittest.mock import patch, MagicMock, call import io +import json import sys +import unittest +from unittest.mock import MagicMock, patch sys.path.insert(0, "..") from kaggle.api.kaggle_api_extended import KaggleApi -from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus + + +def _sse_lines(events): + """Encode a list of event payloads as SSE `data:` lines.""" + out = [] + for evt in events: + if isinstance(evt, str): + out.append(f"data: {evt}") + else: + out.append(f"data: {json.dumps(evt)}") + out.append("") # SSE event terminator + return out class TestKernelsLogs(unittest.TestCase): - """Tests for the kernels_logs and kernels_logs_cli methods.""" + """Tests for the kernels_logs / kernels_logs_stream / kernels_logs_cli methods.""" def setUp(self): self.api = KaggleApi.__new__(KaggleApi) self.api.config_values = {"username": "testuser"} + # ------------------------------------------------------------------ + # kernels_logs (one-shot, persisted blob) + # ------------------------------------------------------------------ + @patch.object(KaggleApi, "build_kaggle_client") @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_returns_log_string(self, mock_validate, mock_client): - """Test that kernels_logs returns the log string from the API response.""" mock_response = MagicMock() mock_response.log = "Line 1\nLine 2\nLine 3" mock_kaggle = MagicMock() @@ -34,7 +49,6 @@ def test_kernels_logs_returns_log_string(self, mock_validate, mock_client): @patch.object(KaggleApi, "build_kaggle_client") @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_returns_empty_string_when_no_log(self, mock_validate, mock_client): - """Test that kernels_logs returns empty string when log is None.""" mock_response = MagicMock() mock_response.log = None mock_kaggle = MagicMock() @@ -45,16 +59,13 @@ def test_kernels_logs_returns_empty_string_when_no_log(self, mock_validate, mock result = self.api.kernels_logs("owner/kernel-slug") self.assertEqual(result, "") - @patch.object(KaggleApi, "build_kaggle_client") - def test_kernels_logs_raises_when_kernel_none(self, mock_client): - """Test that kernels_logs raises ValueError when kernel is None.""" + def test_kernels_logs_raises_when_kernel_none(self): with self.assertRaises(ValueError): self.api.kernels_logs(None) @patch.object(KaggleApi, "build_kaggle_client") @patch.object(KaggleApi, "get_config_value", return_value="defaultuser") def test_kernels_logs_uses_default_user_for_bare_slug(self, mock_config, mock_client): - """Test that a bare kernel slug uses the default username.""" mock_response = MagicMock() mock_response.log = "some log" mock_kaggle = MagicMock() @@ -65,138 +76,173 @@ def test_kernels_logs_uses_default_user_for_bare_slug(self, mock_config, mock_cl result = self.api.kernels_logs("my-kernel") self.assertEqual(result, "some log") - # Verify the request used the default user call_args = mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.call_args request = call_args[0][0] self.assertEqual(request.user_name, "defaultuser") self.assertEqual(request.kernel_slug, "my-kernel") + # ------------------------------------------------------------------ + # kernels_logs_stream (live SSE) + # ------------------------------------------------------------------ + + def _make_streaming_kaggle_client(self, response_mock): + http_session = MagicMock() + http_session.headers = {"User-Agent": "test", "Content-Type": "application/json"} + http_session.auth = None + http_session.get.return_value = response_mock + + http_client = MagicMock() + http_client._session = http_session + http_client._endpoint = "http://localhost" + # Force the non-PROD code path so we exercise the `/api` prefix. + http_client._env = MagicMock(name="LOCAL") + + kaggle = MagicMock() + kaggle._http_client = http_client + + cm = MagicMock() + cm.__enter__ = MagicMock(return_value=kaggle) + cm.__exit__ = MagicMock(return_value=False) + return cm, http_session + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate, mock_client): + events = [ + {"stream_name": "stdout", "time": "t1", "data": "hello"}, + {"stream_name": "stderr", "time": "t2", "data": "warn"}, + "STREAM_END", + # Anything after the sentinel must be ignored. + {"stream_name": "stdout", "time": "t3", "data": "ignored"}, + ] + response = MagicMock() + response.iter_lines.return_value = iter(_sse_lines(events)) + response.raise_for_status = MagicMock() + + cm, http_session = self._make_streaming_kaggle_client(response) + # Force PROD path off + with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: + mock_env.PROD = "PROD" + cm.__enter__.return_value._http_client._env = "LOCAL" + mock_client.return_value = cm + + result = list(self.api.kernels_logs_stream("owner/kernel-slug")) + + self.assertEqual(len(result), 2) + self.assertEqual(result[0]["data"], "hello") + self.assertEqual(result[1]["data"], "warn") + + # Verify URL and SSE Accept header. + call_args = http_session.get.call_args + url = call_args[0][0] + self.assertEqual(url, "http://localhost/api/v1/kernels/owner/kernel-slug/logs/stream") + self.assertEqual(call_args.kwargs["headers"]["Accept"], "text/event-stream") + self.assertNotIn("Content-Type", call_args.kwargs["headers"]) + self.assertTrue(call_args.kwargs["stream"]) + response.close.assert_called_once() + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_stream_handles_non_json_payload(self, _validate, mock_client): + response = MagicMock() + response.iter_lines.return_value = iter(["data: not-json", "", "data: STREAM_END", ""]) + response.raise_for_status = MagicMock() + + cm, _ = self._make_streaming_kaggle_client(response) + with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: + mock_env.PROD = "PROD" + cm.__enter__.return_value._http_client._env = "LOCAL" + mock_client.return_value = cm + result = list(self.api.kernels_logs_stream("owner/kernel-slug")) + + self.assertEqual(result, [{"data": "not-json"}]) + + def test_kernels_logs_stream_raises_when_kernel_none(self): + with self.assertRaises(ValueError): + list(self.api.kernels_logs_stream(None)) + + # ------------------------------------------------------------------ + # kernels_logs_cli + # ------------------------------------------------------------------ + @patch.object(KaggleApi, "kernels_logs") def test_kernels_logs_cli_oneshot(self, mock_logs): - """Test one-shot mode prints log to stdout.""" mock_logs.return_value = "Line 1\nLine 2\nDone" - captured = io.StringIO() sys.stdout = captured try: self.api.kernels_logs_cli("owner/kernel-slug") finally: sys.stdout = sys.__stdout__ - self.assertEqual(captured.getvalue(), "Line 1\nLine 2\nDone\n") + mock_logs.assert_called_once_with("owner/kernel-slug") @patch.object(KaggleApi, "kernels_logs") def test_kernels_logs_cli_uses_kernel_opt(self, mock_logs): - """Test that kernel_opt is used when kernel is None.""" mock_logs.return_value = "log output" - captured = io.StringIO() sys.stdout = captured try: self.api.kernels_logs_cli(None, kernel_opt="owner/kernel-slug") finally: sys.stdout = sys.__stdout__ - mock_logs.assert_called_once_with("owner/kernel-slug") - @patch("time.sleep") - @patch.object(KaggleApi, "kernels_status") - @patch.object(KaggleApi, "kernels_logs") - def test_kernels_logs_cli_follow_mode(self, mock_logs, mock_status, mock_sleep): - """Test follow mode polls and prints new lines, stops on terminal status.""" - # First poll: kernel is running, returns some log lines - # Second poll: kernel is complete, returns more log lines - mock_logs.side_effect = [ - "Line 1\nLine 2", - "Line 1\nLine 2\nLine 3\nLine 4", - "Line 1\nLine 2\nLine 3\nLine 4", # final fetch after terminal status - ] - - status_running = MagicMock() - status_running.status = KernelWorkerStatus.RUNNING - status_complete = MagicMock() - status_complete.status = KernelWorkerStatus.COMPLETE - mock_status.side_effect = [status_running, status_complete] - + @patch.object(KaggleApi, "kernels_logs_stream") + def test_kernels_logs_cli_follow_streams_events(self, mock_stream): + mock_stream.return_value = iter( + [ + {"stream_name": "stdout", "time": "t1", "data": "hello"}, + {"stream_name": "stderr", "time": "t2", "data": "warn\n"}, + {"stream_name": "stdout", "time": "t3", "data": "bye"}, + ] + ) captured = io.StringIO() sys.stdout = captured try: - self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1) + self.api.kernels_logs_cli("owner/kernel-slug", follow=True) finally: sys.stdout = sys.__stdout__ - output = captured.getvalue() - # First poll prints "Line 1\nLine 2" - # Second poll prints "Line 3\nLine 4" - self.assertIn("Line 1", output) - self.assertIn("Line 2", output) - self.assertIn("Line 3", output) - self.assertIn("Line 4", output) - - # Verify sleep was called with the right interval - mock_sleep.assert_called_with(1) - - @patch("time.sleep") - @patch.object(KaggleApi, "kernels_status") - @patch.object(KaggleApi, "kernels_logs") - def test_kernels_logs_cli_follow_stops_on_error(self, mock_logs, mock_status, mock_sleep): - """Test follow mode stops when kernel status is ERROR.""" - mock_logs.side_effect = [ - "Line 1", - "Line 1", # final fetch - ] - - status_error = MagicMock() - status_error.status = KernelWorkerStatus.ERROR - mock_status.return_value = status_error - + # Each event's data is printed; lines without trailing newline get one. + self.assertEqual(captured.getvalue(), "hello\nwarn\nbye\n") + mock_stream.assert_called_once_with("owner/kernel-slug") + + @patch.object(KaggleApi, "kernels_logs_stream") + def test_kernels_logs_cli_follow_skips_events_without_data(self, mock_stream): + mock_stream.return_value = iter( + [ + {"stream_name": "stdout", "time": "t1"}, + {"stream_name": "stdout", "time": "t2", "data": "only-line"}, + ] + ) captured = io.StringIO() sys.stdout = captured try: - self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1) + self.api.kernels_logs_cli("owner/kernel-slug", follow=True) finally: sys.stdout = sys.__stdout__ - - # Should only poll once before stopping - self.assertEqual(mock_status.call_count, 1) - - @patch("time.sleep") - @patch.object(KaggleApi, "kernels_status") - @patch.object(KaggleApi, "kernels_logs") - def test_kernels_logs_cli_follow_stops_on_cancel(self, mock_logs, mock_status, mock_sleep): - """Test follow mode stops when kernel status is CANCEL_ACKNOWLEDGED.""" - mock_logs.side_effect = [ - "Cancelled", - "Cancelled", # final fetch - ] - - status_cancel = MagicMock() - status_cancel.status = KernelWorkerStatus.CANCEL_ACKNOWLEDGED - mock_status.return_value = status_cancel - - captured = io.StringIO() - sys.stdout = captured - try: - self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=1) - finally: - sys.stdout = sys.__stdout__ - - self.assertEqual(mock_status.call_count, 1) + self.assertEqual(captured.getvalue(), "only-line\n") @patch.object(KaggleApi, "kernels_logs") def test_kernels_logs_cli_empty_log(self, mock_logs): - """Test one-shot mode with empty log.""" mock_logs.return_value = "" - captured = io.StringIO() sys.stdout = captured try: self.api.kernels_logs_cli("owner/kernel-slug") finally: sys.stdout = sys.__stdout__ - self.assertEqual(captured.getvalue(), "\n") + @patch.object(KaggleApi, "kernels_logs_stream") + @patch.object(KaggleApi, "kernels_logs") + def test_kernels_logs_cli_interval_is_ignored(self, mock_logs, mock_stream): + # `interval` is retained for backwards compatibility but no longer used. + mock_stream.return_value = iter([]) + self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=42) + mock_logs.assert_not_called() + if __name__ == "__main__": unittest.main() From d231a7054f0789ed02a2f9e5d329a8bc40c14998 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 5 May 2026 16:26:07 +0000 Subject: [PATCH 2/9] Use correct END_OF_LOG sentinel for log stream Per review feedback, the SSE stream terminates with `END_OF_LOG`, not `STREAM_END`. Updated the sentinel constant, docstring, and test fixtures to match the actual server behavior. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 4 ++-- tests/test_kernels_logs.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index 725e7ca4..322426d0 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -4626,7 +4626,7 @@ def _split_kernel(self, kernel: str) -> Tuple[str, str]: return owner_slug, kernel # Sentinel value emitted by the streaming endpoint to signal end-of-stream. - _LOG_STREAM_END_SENTINEL = "STREAM_END" + _LOG_STREAM_END_SENTINEL = "END_OF_LOG" def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: """Stream live execution logs for a kernel via the midtier SSE proxy. @@ -4635,7 +4635,7 @@ def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: resolves the kernel's active session, waits for the upstream LogsURL to be published, and pipes the SSE feed back to us. Each event payload is a JSON object with `stream_name`, `time`, and `data` keys; the server - terminates the stream by emitting a `STREAM_END` sentinel. + terminates the stream by emitting an `END_OF_LOG` sentinel. Args: kernel: The kernel identifier in the format owner/kernel-slug. diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 8abf31d6..6492e9bb 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -111,7 +111,7 @@ def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate events = [ {"stream_name": "stdout", "time": "t1", "data": "hello"}, {"stream_name": "stderr", "time": "t2", "data": "warn"}, - "STREAM_END", + "END_OF_LOG", # Anything after the sentinel must be ignored. {"stream_name": "stdout", "time": "t3", "data": "ignored"}, ] @@ -145,7 +145,7 @@ def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_stream_handles_non_json_payload(self, _validate, mock_client): response = MagicMock() - response.iter_lines.return_value = iter(["data: not-json", "", "data: STREAM_END", ""]) + response.iter_lines.return_value = iter(["data: not-json", "", "data: END_OF_LOG", ""]) response.raise_for_status = MagicMock() cm, _ = self._make_streaming_kaggle_client(response) From f92a3fe5271fbb3da098f6c0c6d36d3e3ab944d4 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 5 May 2026 17:26:05 +0000 Subject: [PATCH 3/9] Tests pass and changes are staged. The blob-fallback adaptation is complete: `kernels_logs_stream` now branches on `Content-Type`, using `_iter_sse_events` for live SSE and `_iter_blob_lines` for the persisted GCS blob returned after the session finishes. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 51 +++++++++++++++-------- tests/test_kernels_logs.py | 60 ++++++++++++++++++++++++--- 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index 322426d0..bbe905ea 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -4629,13 +4629,15 @@ def _split_kernel(self, kernel: str) -> Tuple[str, str]: _LOG_STREAM_END_SENTINEL = "END_OF_LOG" def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: - """Stream live execution logs for a kernel via the midtier SSE proxy. + """Stream execution logs for a kernel via the midtier logs endpoint. - The midtier endpoint (`GET /api/v1/kernels/{owner}/{slug}/logs/stream`) - resolves the kernel's active session, waits for the upstream LogsURL to - be published, and pipes the SSE feed back to us. Each event payload is - a JSON object with `stream_name`, `time`, and `data` keys; the server - terminates the stream by emitting an `END_OF_LOG` sentinel. + `GET /api/v1/kernels/{owner}/{slug}/logs/stream` adapts to the session + state: while the session is running it proxies the upstream SSE feed + (`Content-Type: text/event-stream`, JSON `{stream_name, time, data}` + events terminated by an `END_OF_LOG` sentinel); once the session is + done it returns the persisted log blob from GCS with a non-SSE + content type. We branch on `Content-Type` and yield uniform + `{"data": ...}` events either way. Args: kernel: The kernel identifier in the format owner/kernel-slug. @@ -4652,7 +4654,7 @@ def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: url = f"{base}/v1/kernels/{owner_slug}/{kernel_slug}/logs/stream" headers = dict(http._session.headers) - headers["Accept"] = "text/event-stream" + headers["Accept"] = "text/event-stream, */*" headers.pop("Content-Type", None) try: @@ -4669,19 +4671,34 @@ def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: raise try: - for raw_line in response.iter_lines(decode_unicode=True): - if not raw_line or not raw_line.startswith("data:"): - continue - payload = raw_line[len("data:") :].lstrip() - if payload == self._LOG_STREAM_END_SENTINEL: - return - try: - yield json.loads(payload) - except json.JSONDecodeError: - yield {"data": payload} + content_type = (response.headers.get("Content-Type") or "").lower() + if content_type.startswith("text/event-stream"): + yield from self._iter_sse_events(response) + else: + yield from self._iter_blob_lines(response) finally: response.close() + def _iter_sse_events(self, response) -> Iterator[Dict[str, str]]: + """Parse `data:` lines from a live SSE response, stopping on the sentinel.""" + for raw_line in response.iter_lines(decode_unicode=True): + if not raw_line or not raw_line.startswith("data:"): + continue + payload = raw_line[len("data:") :].lstrip() + if payload == self._LOG_STREAM_END_SENTINEL: + return + try: + yield json.loads(payload) + except json.JSONDecodeError: + yield {"data": payload} + + def _iter_blob_lines(self, response) -> Iterator[Dict[str, str]]: + """Yield one event per line from a non-SSE blob (completed session fallback).""" + for raw_line in response.iter_lines(decode_unicode=True): + if raw_line is None: + continue + yield {"data": raw_line} + def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None): """Print kernel execution logs to stdout. diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 6492e9bb..2b2c3219 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -105,6 +105,22 @@ def _make_streaming_kaggle_client(self, response_mock): cm.__exit__ = MagicMock(return_value=False) return cm, http_session + @staticmethod + def _sse_response(events): + response = MagicMock() + response.headers = {"Content-Type": "text/event-stream"} + response.iter_lines.return_value = iter(_sse_lines(events)) + response.raise_for_status = MagicMock() + return response + + @staticmethod + def _blob_response(lines, content_type="text/plain"): + response = MagicMock() + response.headers = {"Content-Type": content_type} + response.iter_lines.return_value = iter(lines) + response.raise_for_status = MagicMock() + return response + @patch.object(KaggleApi, "build_kaggle_client") @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate, mock_client): @@ -115,9 +131,7 @@ def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate # Anything after the sentinel must be ignored. {"stream_name": "stdout", "time": "t3", "data": "ignored"}, ] - response = MagicMock() - response.iter_lines.return_value = iter(_sse_lines(events)) - response.raise_for_status = MagicMock() + response = self._sse_response(events) cm, http_session = self._make_streaming_kaggle_client(response) # Force PROD path off @@ -132,11 +146,11 @@ def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate self.assertEqual(result[0]["data"], "hello") self.assertEqual(result[1]["data"], "warn") - # Verify URL and SSE Accept header. + # Verify URL and Accept header (advertise SSE but accept anything for blob fallback). call_args = http_session.get.call_args url = call_args[0][0] self.assertEqual(url, "http://localhost/api/v1/kernels/owner/kernel-slug/logs/stream") - self.assertEqual(call_args.kwargs["headers"]["Accept"], "text/event-stream") + self.assertEqual(call_args.kwargs["headers"]["Accept"], "text/event-stream, */*") self.assertNotIn("Content-Type", call_args.kwargs["headers"]) self.assertTrue(call_args.kwargs["stream"]) response.close.assert_called_once() @@ -145,6 +159,7 @@ def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_stream_handles_non_json_payload(self, _validate, mock_client): response = MagicMock() + response.headers = {"Content-Type": "text/event-stream"} response.iter_lines.return_value = iter(["data: not-json", "", "data: END_OF_LOG", ""]) response.raise_for_status = MagicMock() @@ -157,6 +172,41 @@ def test_kernels_logs_stream_handles_non_json_payload(self, _validate, mock_clie self.assertEqual(result, [{"data": "not-json"}]) + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_stream_falls_back_to_blob_for_completed_session(self, _validate, mock_client): + # When the session is done the midtier returns the persisted GCS blob + # with a non-SSE content type. We should yield one event per line. + response = self._blob_response(["line one", "line two", "line three"]) + + cm, _ = self._make_streaming_kaggle_client(response) + with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: + mock_env.PROD = "PROD" + cm.__enter__.return_value._http_client._env = "LOCAL" + mock_client.return_value = cm + result = list(self.api.kernels_logs_stream("owner/kernel-slug")) + + self.assertEqual( + result, + [{"data": "line one"}, {"data": "line two"}, {"data": "line three"}], + ) + response.close.assert_called_once() + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_stream_blob_fallback_with_octet_stream(self, _validate, mock_client): + # GCS blobs may come back as application/octet-stream; same handling. + response = self._blob_response(["only-line"], content_type="application/octet-stream") + + cm, _ = self._make_streaming_kaggle_client(response) + with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: + mock_env.PROD = "PROD" + cm.__enter__.return_value._http_client._env = "LOCAL" + mock_client.return_value = cm + result = list(self.api.kernels_logs_stream("owner/kernel-slug")) + + self.assertEqual(result, [{"data": "only-line"}]) + def test_kernels_logs_stream_raises_when_kernel_none(self): with self.assertRaises(ValueError): list(self.api.kernels_logs_stream(None)) From 77b07d5b67914dd91bd53ad67ffa3a5546e14d26 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 16 Jun 2026 17:54:18 +0000 Subject: [PATCH 4/9] Fix kernel logs stream URL to match midtier slug binding The midtier `additional_bindings` for `GetKernelSessionLogsStream` registers the slug-mode route as `/api/v1/kernels/logs/stream/ {user_name}/{kernel_slug}` (PR #43349), with the slug pair after `logs/stream` to match the other kernels output endpoints. The client was calling `/v1/kernels/{owner}/{slug}/logs/stream`, which would 404 in production. Update the URL, docstring, and test assertion to the correct path. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 4 ++-- tests/test_kernels_logs.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index bbe905ea..a48f5b70 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -4631,7 +4631,7 @@ def _split_kernel(self, kernel: str) -> Tuple[str, str]: def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: """Stream execution logs for a kernel via the midtier logs endpoint. - `GET /api/v1/kernels/{owner}/{slug}/logs/stream` adapts to the session + `GET /api/v1/kernels/logs/stream/{owner}/{slug}` adapts to the session state: while the session is running it proxies the upstream SSE feed (`Content-Type: text/event-stream`, JSON `{stream_name, time, data}` events terminated by an `END_OF_LOG` sentinel); once the session is @@ -4651,7 +4651,7 @@ def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]: http = kaggle._http_client http._init_session() base = http._endpoint if http._env == KaggleEnv.PROD else f"{http._endpoint}/api" - url = f"{base}/v1/kernels/{owner_slug}/{kernel_slug}/logs/stream" + url = f"{base}/v1/kernels/logs/stream/{owner_slug}/{kernel_slug}" headers = dict(http._session.headers) headers["Accept"] = "text/event-stream, */*" diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 2b2c3219..3aec5a9d 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -149,7 +149,7 @@ def test_kernels_logs_stream_yields_events_and_stops_on_sentinel(self, _validate # Verify URL and Accept header (advertise SSE but accept anything for blob fallback). call_args = http_session.get.call_args url = call_args[0][0] - self.assertEqual(url, "http://localhost/api/v1/kernels/owner/kernel-slug/logs/stream") + self.assertEqual(url, "http://localhost/api/v1/kernels/logs/stream/owner/kernel-slug") self.assertEqual(call_args.kwargs["headers"]["Accept"], "text/event-stream, */*") self.assertNotIn("Content-Type", call_args.kwargs["headers"]) self.assertTrue(call_args.kwargs["stream"]) From cca1a8053ab21b06f90909200557229e2d57ef53 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 16 Jun 2026 18:00:34 +0000 Subject: [PATCH 5/9] Resolve merge conflicts with main Take main's `parse_kernel_string` + None-check in `kernels_logs` to match the convention used by the other kernel functions, and merge main's new `kernels_output` paging tests alongside this branch's SSE log-stream tests (keeping both `tempfile` and `json` imports). Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 4 +- tests/test_kernels_logs.py | 79 +++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index a48f5b70..ae2fba16 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -4596,7 +4596,9 @@ def kernels_logs(self, kernel: str) -> str: Returns: str: The log content from the kernel's latest session. """ - owner_slug, kernel_slug = self._split_kernel(kernel) + if kernel is None: + raise ValueError("A kernel must be specified") + owner_slug, kernel_slug, version = self.parse_kernel_string(kernel) with self.build_kaggle_client() as kaggle: request = ApiListKernelSessionOutputRequest() diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 3aec5a9d..e9b022cf 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -2,6 +2,7 @@ import io import json import sys +import tempfile import unittest from unittest.mock import MagicMock, patch @@ -29,6 +30,84 @@ def setUp(self): self.api = KaggleApi.__new__(KaggleApi) self.api.config_values = {"username": "testuser"} + @patch("kaggle.api.kaggle_api_extended.requests.get") + @patch.object(KaggleApi, "build_kaggle_client") + def test_kernels_output_file_pattern_searches_all_pages(self, mock_client, mock_get): + """Test output download applies file_pattern across all paged results.""" + first_response = MagicMock() + first_response.files = [MagicMock(file_name="first.txt", url="https://example.com/first.txt")] + first_response.next_page_token = "page-2" + first_response.log = None + + second_response = MagicMock() + second_response.files = [MagicMock(file_name="result.png", url="https://example.com/result.png")] + second_response.next_page_token = "" + second_response.log = None + + mock_kaggle = MagicMock() + mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.side_effect = [ + first_response, + second_response, + ] + mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle) + mock_client.return_value.__exit__ = MagicMock(return_value=False) + mock_get.return_value = MagicMock(content=b"png") + self.api.download_needed = MagicMock(return_value=True) + + with tempfile.TemporaryDirectory() as temp_dir: + outfiles, token = self.api.kernels_output( + "owner/kernel-slug", temp_dir, file_pattern=r".*\.png$", quiet=True, page_size=1 + ) + + self.assertEqual(token, "") + self.assertEqual(len(outfiles), 1) + self.assertTrue(outfiles[0].endswith("result.png")) + mock_get.assert_called_once_with("https://example.com/result.png", stream=True) + self.assertEqual(mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.call_count, 2) + second_request = mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.call_args_list[1][0][0] + self.assertEqual(second_request.page_token, "page-2") + self.assertEqual(second_request.page_size, 1) + + @patch("kaggle.api.kaggle_api_extended.requests.get") + @patch.object(KaggleApi, "build_kaggle_client") + def test_kernels_output_page_token_downloads_specific_page(self, mock_client, mock_get): + """Test output download uses a supplied page token for one page only.""" + response = MagicMock() + response.files = [MagicMock(file_name="page-file.csv", url="https://example.com/page-file.csv")] + response.next_page_token = "page-3" + response.log = None + + mock_kaggle = MagicMock() + mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.return_value = response + mock_client.return_value.__enter__ = MagicMock(return_value=mock_kaggle) + mock_client.return_value.__exit__ = MagicMock(return_value=False) + mock_get.return_value = MagicMock(content=b"csv") + self.api.download_needed = MagicMock(return_value=True) + + with tempfile.TemporaryDirectory() as temp_dir: + outfiles, token = self.api.kernels_output( + "owner/kernel-slug", temp_dir, quiet=True, page_token="page-2", page_size=50 + ) + + self.assertEqual(token, "page-3") + self.assertEqual(len(outfiles), 1) + self.assertTrue(outfiles[0].endswith("page-file.csv")) + mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.assert_called_once() + request = mock_kaggle.kernels.kernels_api_client.list_kernel_session_output.call_args[0][0] + self.assertEqual(request.page_token, "page-2") + self.assertEqual(request.page_size, 50) + + @patch.object(KaggleApi, "kernels_output") + def test_kernels_output_cli_passes_page_size(self, mock_output): + """Test CLI wrapper passes page_size to kernels_output.""" + mock_output.return_value = ([], "") + + self.api.kernels_output_cli("owner/kernel-slug", page_size=75) + + mock_output.assert_called_once_with( + "owner/kernel-slug", None, None, False, False, page_token=None, page_size=75 + ) + # ------------------------------------------------------------------ # kernels_logs (one-shot, persisted blob) # ------------------------------------------------------------------ From 91036fc7a4e883213932ecdfe6972c5e859a588e Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 16 Jun 2026 18:15:43 +0000 Subject: [PATCH 6/9] Align test_kernels_logs.py imports with main to clear conflicts Main reordered the imports and dropped the section-divider comment in a follow-up commit, leaving this branch with two fresh merge conflicts on rebase. Restructure the imports to match main's layout (with `import json` inserted in the new order) and drop the divider so the rebase auto-merges cleanly. Co-authored-by: kaggle-agent --- tests/test_kernels_logs.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index e9b022cf..84446a85 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -1,10 +1,10 @@ # coding=utf-8 +import unittest +from unittest.mock import patch, MagicMock, call import io import json -import sys import tempfile -import unittest -from unittest.mock import MagicMock, patch +import sys sys.path.insert(0, "..") @@ -108,10 +108,6 @@ def test_kernels_output_cli_passes_page_size(self, mock_output): "owner/kernel-slug", None, None, False, False, page_token=None, page_size=75 ) - # ------------------------------------------------------------------ - # kernels_logs (one-shot, persisted blob) - # ------------------------------------------------------------------ - @patch.object(KaggleApi, "build_kaggle_client") @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_returns_log_string(self, mock_validate, mock_client): From 4a8c2265e8dca6623c2667923c2289d9b8d7b086 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 16 Jun 2026 21:47:31 +0000 Subject: [PATCH 7/9] Reconnect kernel log stream with dedup on transient drops Load balancers cut idle SSE connections after ~2 minutes, so a long- running `kaggle kernels logs --follow` would die mid-session. The CLI now wraps `kernels_logs_stream()` in a retry loop, skipping the events the server replays from the start of each reconnect, and gives up after 5 consecutive failures with no new data. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 47 +++++++++++++++++++--- tests/test_kernels_logs.py | 58 +++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index ccf9a3e6..ad54850a 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -5340,6 +5340,12 @@ def _iter_blob_lines(self, response) -> Iterator[Dict[str, str]]: continue yield {"data": raw_line} + # `--follow` reconnects on transient network drops (e.g. load-balancer + # idle timeouts). On reconnect the server replays the stream from the + # beginning, so we dedup by counting events handled so far. + _LOG_STREAM_MAX_FAILURES = 5 + _LOG_STREAM_RECONNECT_DELAY_SEC = 1 + def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None): """Print kernel execution logs to stdout. @@ -5347,6 +5353,9 @@ def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None) kernel's latest session. In `--follow` mode attaches to the midtier SSE log stream and prints log lines as they are produced by the running session, exiting when the server signals end-of-stream. + Transient connection drops are retried transparently; the server + replays from the beginning on reconnect and already-seen events + are skipped. Args: kernel: The kernel for which to retrieve the logs. @@ -5361,11 +5370,39 @@ def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None) print(self.kernels_logs(kernel)) return - for event in self.kernels_logs_stream(kernel): - data = event.get("data") - if data is None: - continue - print(data, flush=True, end="" if data.endswith("\n") else "\n") + seen_count = 0 + failures_without_progress = 0 + + while True: + seen_before = seen_count + try: + for index, event in enumerate(self.kernels_logs_stream(kernel)): + if index < seen_count: + continue + seen_count = index + 1 + data = event.get("data") + if data is None: + continue + print(data, flush=True, end="" if data.endswith("\n") else "\n") + return + except ( + requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError, + urllib3_exceptions.ProtocolError, + ): + if seen_count == seen_before: + failures_without_progress += 1 + else: + failures_without_progress = 0 + if failures_without_progress >= self._LOG_STREAM_MAX_FAILURES: + print( + f"Log stream connection failed {self._LOG_STREAM_MAX_FAILURES} " + "times with no new data; giving up.", + file=sys.stderr, + ) + return + print("Log stream connection lost, reconnecting...", file=sys.stderr) + time.sleep(self._LOG_STREAM_RECONNECT_DELAY_SEC) def model_get(self, model: str) -> ApiModel: """Gets a model. diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 84446a85..50fefc89 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -368,6 +368,64 @@ def test_kernels_logs_cli_interval_is_ignored(self, mock_logs, mock_stream): self.api.kernels_logs_cli("owner/kernel-slug", follow=True, interval=42) mock_logs.assert_not_called() + @patch("kaggle.api.kaggle_api_extended.time.sleep") + @patch.object(KaggleApi, "kernels_logs_stream") + def test_kernels_logs_cli_follow_reconnects_and_dedupes(self, mock_stream, mock_sleep): + """On a mid-stream drop the CLI reconnects and skips replayed events.""" + import requests as _requests + + def first_attempt(): + yield {"stream_name": "stdout", "time": "t1", "data": "one"} + yield {"stream_name": "stdout", "time": "t2", "data": "two"} + raise _requests.exceptions.ChunkedEncodingError("dropped") + + def second_attempt(): + # Server replays from the beginning on reconnect. + yield {"stream_name": "stdout", "time": "t1", "data": "one"} + yield {"stream_name": "stdout", "time": "t2", "data": "two"} + yield {"stream_name": "stdout", "time": "t3", "data": "three"} + + mock_stream.side_effect = [first_attempt(), second_attempt()] + + captured_out = io.StringIO() + captured_err = io.StringIO() + sys.stdout = captured_out + sys.stderr = captured_err + try: + self.api.kernels_logs_cli("owner/kernel-slug", follow=True) + finally: + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + self.assertEqual(captured_out.getvalue(), "one\ntwo\nthree\n") + self.assertIn("reconnecting", captured_err.getvalue()) + self.assertEqual(mock_stream.call_count, 2) + mock_sleep.assert_called_once() + + @patch("kaggle.api.kaggle_api_extended.time.sleep") + @patch.object(KaggleApi, "kernels_logs_stream") + def test_kernels_logs_cli_follow_gives_up_after_max_failures(self, mock_stream, mock_sleep): + """After repeated failures with no new data the CLI exits gracefully.""" + import requests as _requests + + def always_fails(): + if False: + yield # generator that never yields, then raises + raise _requests.exceptions.ConnectionError("nope") + + # Five consecutive failures with no progress should trigger giveup. + mock_stream.side_effect = [always_fails() for _ in range(5)] + + captured_err = io.StringIO() + sys.stderr = captured_err + try: + self.api.kernels_logs_cli("owner/kernel-slug", follow=True) + finally: + sys.stderr = sys.__stderr__ + + self.assertIn("giving up", captured_err.getvalue()) + self.assertEqual(mock_stream.call_count, 5) + if __name__ == "__main__": unittest.main() From e2cf2944851488a4d753169f459d550183729388 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 16 Jun 2026 22:40:47 +0000 Subject: [PATCH 8/9] Silence reconnect warning on single transient drop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The load balancer resets idle SSE connections roughly every 3 minutes, so any long `--follow` session normally hits one drop and recovers silently on retry. Logging a warning each time would clutter stderr in the common case, so the message now only fires on the second consecutive failure with no new data — i.e. when something actually looks wrong. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 6 ++++- tests/test_kernels_logs.py | 36 ++++++++++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index ad54850a..0bde5672 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -5401,7 +5401,11 @@ def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None) file=sys.stderr, ) return - print("Log stream connection lost, reconnecting...", file=sys.stderr) + # Stay quiet on the first reconnect — the load balancer cuts + # idle SSE connections every few minutes, so a single retry is + # the common case and shouldn't be reported as noise. + if failures_without_progress > 1: + print("Log stream connection lost, reconnecting...", file=sys.stderr) time.sleep(self._LOG_STREAM_RECONNECT_DELAY_SEC) def model_get(self, model: str) -> ApiModel: diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 50fefc89..2159d3c5 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -398,10 +398,44 @@ def second_attempt(): sys.stderr = sys.__stderr__ self.assertEqual(captured_out.getvalue(), "one\ntwo\nthree\n") - self.assertIn("reconnecting", captured_err.getvalue()) + # The first reconnect after a successful read stays silent — the LB + # cuts idle SSE connections routinely, so reporting it would be noise. + self.assertEqual(captured_err.getvalue(), "") self.assertEqual(mock_stream.call_count, 2) mock_sleep.assert_called_once() + @patch("kaggle.api.kaggle_api_extended.time.sleep") + @patch.object(KaggleApi, "kernels_logs_stream") + def test_kernels_logs_cli_follow_reports_only_repeat_failures(self, mock_stream, mock_sleep): + """A second consecutive failure with no new data surfaces the warning.""" + import requests as _requests + + def fail_immediately(): + if False: + yield + raise _requests.exceptions.ConnectionError("nope") + + def succeed(): + yield {"stream_name": "stdout", "time": "t1", "data": "done"} + + # Two back-to-back failures (no progress), then a clean stream. + mock_stream.side_effect = [fail_immediately(), fail_immediately(), succeed()] + + captured_out = io.StringIO() + captured_err = io.StringIO() + sys.stdout = captured_out + sys.stderr = captured_err + try: + self.api.kernels_logs_cli("owner/kernel-slug", follow=True) + finally: + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + self.assertEqual(captured_out.getvalue(), "done\n") + # Exactly one "reconnecting" message — printed on the second failure + # (the first stays silent). + self.assertEqual(captured_err.getvalue().count("reconnecting"), 1) + @patch("kaggle.api.kaggle_api_extended.time.sleep") @patch.object(KaggleApi, "kernels_logs_stream") def test_kernels_logs_cli_follow_gives_up_after_max_failures(self, mock_stream, mock_sleep): From 6b03fbe4efbdc766bd5f828d4de0e44370cd60a1 Mon Sep 17 00:00:00 2001 From: herbison Date: Tue, 16 Jun 2026 22:50:53 +0000 Subject: [PATCH 9/9] Render completed-session logs the same way as live streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The midtier serves the persisted log blob as a JSON array of `{stream_name, time, data}` objects — the same shape as live SSE events — but the client was dumping it as raw text, so completed- session output looked like JSON noise instead of log lines. Parse the array and yield each entry, so the CLI's existing per-event rendering produces identical output whether the session is live or finished. Co-authored-by: kaggle-agent --- src/kaggle/api/kaggle_api_extended.py | 28 ++++++++++--- tests/test_kernels_logs.py | 57 ++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 14 deletions(-) diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index 0bde5672..017ff807 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -5334,11 +5334,29 @@ def _iter_sse_events(self, response) -> Iterator[Dict[str, str]]: yield {"data": payload} def _iter_blob_lines(self, response) -> Iterator[Dict[str, str]]: - """Yield one event per line from a non-SSE blob (completed session fallback).""" - for raw_line in response.iter_lines(decode_unicode=True): - if raw_line is None: - continue - yield {"data": raw_line} + """Yield events from a non-SSE blob (completed session fallback). + + The midtier serves the persisted GCS log as a JSON array of + `{stream_name, time, data}` objects — the same shape as live SSE + events. Parsing and yielding each entry lets the CLI render + completed-session output the same way as a live stream (one + `data` value per line) instead of dumping raw JSON. Unknown + blob formats fall back to line-by-line so callers still see + something readable. + """ + body = response.text + try: + payload = json.loads(body) + except (json.JSONDecodeError, ValueError): + for line in body.splitlines(): + if line: + yield {"data": line} + return + + events = payload if isinstance(payload, list) else [payload] + for event in events: + if isinstance(event, dict): + yield event # `--follow` reconnects on transient network drops (e.g. load-balancer # idle timeouts). On reconnect the server replays the stream from the diff --git a/tests/test_kernels_logs.py b/tests/test_kernels_logs.py index 2159d3c5..74b22cc0 100644 --- a/tests/test_kernels_logs.py +++ b/tests/test_kernels_logs.py @@ -189,10 +189,20 @@ def _sse_response(events): return response @staticmethod - def _blob_response(lines, content_type="text/plain"): + def _blob_response(events, content_type="application/json"): + # The midtier serves completed-session logs as a JSON array of + # {stream_name, time, data} objects — same shape as live SSE events. response = MagicMock() response.headers = {"Content-Type": content_type} - response.iter_lines.return_value = iter(lines) + response.text = json.dumps(events) + response.raise_for_status = MagicMock() + return response + + @staticmethod + def _raw_blob_response(body, content_type="text/plain"): + response = MagicMock() + response.headers = {"Content-Type": content_type} + response.text = body response.raise_for_status = MagicMock() return response @@ -251,8 +261,16 @@ def test_kernels_logs_stream_handles_non_json_payload(self, _validate, mock_clie @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_stream_falls_back_to_blob_for_completed_session(self, _validate, mock_client): # When the session is done the midtier returns the persisted GCS blob - # with a non-SSE content type. We should yield one event per line. - response = self._blob_response(["line one", "line two", "line three"]) + # as a JSON array of {stream_name, time, data} objects — same shape + # as live SSE events. Yield them as-is so the CLI renders them + # identically to a live stream. + response = self._blob_response( + [ + {"stream_name": "stdout", "time": 1.0, "data": "line one\n"}, + {"stream_name": "stderr", "time": 2.0, "data": "line two\n"}, + {"stream_name": "stdout", "time": 3.0, "data": "line three\n"}, + ] + ) cm, _ = self._make_streaming_kaggle_client(response) with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: @@ -262,16 +280,37 @@ def test_kernels_logs_stream_falls_back_to_blob_for_completed_session(self, _val result = list(self.api.kernels_logs_stream("owner/kernel-slug")) self.assertEqual( - result, - [{"data": "line one"}, {"data": "line two"}, {"data": "line three"}], + [event["data"] for event in result], + ["line one\n", "line two\n", "line three\n"], ) + self.assertEqual(result[1]["stream_name"], "stderr") response.close.assert_called_once() @patch.object(KaggleApi, "build_kaggle_client") @patch.object(KaggleApi, "validate_kernel_string") def test_kernels_logs_stream_blob_fallback_with_octet_stream(self, _validate, mock_client): - # GCS blobs may come back as application/octet-stream; same handling. - response = self._blob_response(["only-line"], content_type="application/octet-stream") + # GCS blobs may come back as application/octet-stream; same JSON + # handling applies. + response = self._blob_response( + [{"stream_name": "stdout", "time": 1.0, "data": "only-line\n"}], + content_type="application/octet-stream", + ) + + cm, _ = self._make_streaming_kaggle_client(response) + with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: + mock_env.PROD = "PROD" + cm.__enter__.return_value._http_client._env = "LOCAL" + mock_client.return_value = cm + result = list(self.api.kernels_logs_stream("owner/kernel-slug")) + + self.assertEqual([event["data"] for event in result], ["only-line\n"]) + + @patch.object(KaggleApi, "build_kaggle_client") + @patch.object(KaggleApi, "validate_kernel_string") + def test_kernels_logs_stream_blob_fallback_handles_non_json(self, _validate, mock_client): + # If the blob isn't JSON (legacy or unexpected format), still yield + # something readable line-by-line. + response = self._raw_blob_response("plain line one\nplain line two\n") cm, _ = self._make_streaming_kaggle_client(response) with patch("kaggle.api.kaggle_api_extended.KaggleEnv") as mock_env: @@ -280,7 +319,7 @@ def test_kernels_logs_stream_blob_fallback_with_octet_stream(self, _validate, mo mock_client.return_value = cm result = list(self.api.kernels_logs_stream("owner/kernel-slug")) - self.assertEqual(result, [{"data": "only-line"}]) + self.assertEqual(result, [{"data": "plain line one"}, {"data": "plain line two"}]) def test_kernels_logs_stream_raises_when_kernel_none(self): with self.assertRaises(ValueError):