Skip to content
Merged
178 changes: 140 additions & 38 deletions src/kaggle/api/kaggle_api_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
ApiDeleteKernelRequest,
ApiGetAcceleratorQuotaStatisticsRequest,
)
from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus, KernelsListSortType, KernelsListViewType
from kagglesdk.kernels.types.kernels_enums import KernelsListSortType, KernelsListViewType
from kagglesdk.models.types.model_api_service import (
ApiListModelsRequest,
ApiCreateModelRequest,
Expand Down Expand Up @@ -216,7 +216,7 @@
from enum import EnumMeta
from requests.exceptions import HTTPError
from requests.models import Response
from typing import Callable, cast, Dict, List, Mapping, Optional, Tuple, Union, TypeVar, Iterable
from typing import Callable, cast, Dict, Iterator, List, Mapping, Optional, Tuple, Union, TypeVar, Iterable

T = TypeVar("T")

Expand Down Expand Up @@ -5255,52 +5255,154 @@ def kernels_logs(self, kernel: str) -> str:
raise
return response.log or ""

def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=5):
def _split_kernel(self, kernel: str) -> Tuple[str, str]:
"""Split a kernel identifier into (owner_slug, kernel_slug)."""
if kernel is None:
raise ValueError("A kernel must be specified")
if "/" in kernel:
self.validate_kernel_string(kernel)
owner_slug, kernel_slug = kernel.split("/", 1)
return owner_slug, kernel_slug
owner_slug = self.get_config_value(self.CONFIG_NAME_USER) or ""
return owner_slug, kernel

# Sentinel value emitted by the streaming endpoint to signal end-of-stream.
_LOG_STREAM_END_SENTINEL = "END_OF_LOG"

def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]:
"""Stream execution logs for a kernel via the midtier logs endpoint.

`GET /api/v1/kernels/logs/stream/{owner}/{slug}` adapts to the session
state: while the session is running it proxies the upstream SSE feed
(`Content-Type: text/event-stream`, JSON `{stream_name, time, data}`
events terminated by an `END_OF_LOG` sentinel); once the session is
done it returns the persisted log blob from GCS with a non-SSE
content type. We branch on `Content-Type` and yield uniform
`{"data": ...}` events either way.

Args:
kernel: The kernel identifier in the format owner/kernel-slug.

Yields:
Dict[str, str]: Parsed event payloads.
"""
owner_slug, kernel_slug = self._split_kernel(kernel)

with self.build_kaggle_client() as kaggle:
http = kaggle._http_client
http._init_session()
base = http._endpoint if http._env == KaggleEnv.PROD else f"{http._endpoint}/api"
url = f"{base}/v1/kernels/logs/stream/{owner_slug}/{kernel_slug}"

headers = dict(http._session.headers)
headers["Accept"] = "text/event-stream, */*"
headers.pop("Content-Type", None)

try:
response = http._session.get(url, stream=True, headers=headers, auth=http._session.auth)
response.raise_for_status()
except HTTPError as e:
if e.response is not None and e.response.status_code in (401, 403):
raise ValueError(
f"Cannot stream logs for kernel '{kernel}' "
"(Permission 'kernels.get' was denied). "
"The most likely cause is a wrong kernel slug. "
"Use the slug from the notebook URL (kaggle.com/code/owner/KERNEL-SLUG)."
)
raise

try:
content_type = (response.headers.get("Content-Type") or "").lower()
if content_type.startswith("text/event-stream"):
yield from self._iter_sse_events(response)
else:
yield from self._iter_blob_lines(response)
finally:
response.close()

def _iter_sse_events(self, response) -> Iterator[Dict[str, str]]:
"""Parse `data:` lines from a live SSE response, stopping on the sentinel."""
for raw_line in response.iter_lines(decode_unicode=True):
if not raw_line or not raw_line.startswith("data:"):
continue
payload = raw_line[len("data:") :].lstrip()
if payload == self._LOG_STREAM_END_SENTINEL:
return
try:
yield json.loads(payload)
except json.JSONDecodeError:
yield {"data": payload}

def _iter_blob_lines(self, response) -> Iterator[Dict[str, str]]:
"""Yield one event per line from a non-SSE blob (completed session fallback)."""
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
yield {"data": raw_line}

# `--follow` reconnects on transient network drops (e.g. load-balancer
# idle timeouts). On reconnect the server replays the stream from the
# beginning, so we dedup by counting events handled so far.
_LOG_STREAM_MAX_FAILURES = 5
_LOG_STREAM_RECONNECT_DELAY_SEC = 1

def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None):
"""Print kernel execution logs to stdout.

In one-shot mode (default) prints the persisted log blob for the
kernel's latest session. In `--follow` mode attaches to the midtier
SSE log stream and prints log lines as they are produced by the
running session, exiting when the server signals end-of-stream.
Transient connection drops are retried transparently; the server
replays from the beginning on reconnect and already-seen events
are skipped.

Args:
kernel: The kernel for which to retrieve the logs.
kernel_opt: An alternative option to providing a kernel.
follow: If True, continuously poll and print new log lines.
interval: Polling interval in seconds for follow mode (default 5).
follow: If True, attach to the live log stream.
interval: Deprecated; retained for CLI backwards compatibility.
"""
del interval # No longer used; live streaming is push-based.
kernel = kernel or kernel_opt
terminal_statuses = {
KernelWorkerStatus.COMPLETE,
KernelWorkerStatus.ERROR,
KernelWorkerStatus.CANCEL_ACKNOWLEDGED,
}
printed_lines = 0

while True:
log = self.kernels_logs(kernel)
lines = log.split("\n") if log else []
if not follow:
print(self.kernels_logs(kernel))
return

if follow:
new_lines = lines[printed_lines:]
if new_lines:
print("\n".join(new_lines), flush=True)
printed_lines = len(lines)
seen_count = 0
failures_without_progress = 0

# Check if the kernel has reached a terminal status
try:
status_response = self.kernels_status(kernel)
status = status_response.status
except Exception:
break
if status in terminal_statuses:
# Fetch final logs one more time
log = self.kernels_logs(kernel)
lines = log.split("\n") if log else []
final_new_lines = lines[printed_lines:]
if final_new_lines:
print("\n".join(final_new_lines), flush=True)
break

time.sleep(interval)
else:
print(log)
break
while True:
seen_before = seen_count
try:
for index, event in enumerate(self.kernels_logs_stream(kernel)):
if index < seen_count:
continue
seen_count = index + 1
data = event.get("data")
if data is None:
continue
print(data, flush=True, end="" if data.endswith("\n") else "\n")
return
except (
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError,
urllib3_exceptions.ProtocolError,
):
if seen_count == seen_before:
failures_without_progress += 1
else:
failures_without_progress = 0
if failures_without_progress >= self._LOG_STREAM_MAX_FAILURES:
print(
f"Log stream connection failed {self._LOG_STREAM_MAX_FAILURES} "
"times with no new data; giving up.",
file=sys.stderr,
)
return
print("Log stream connection lost, reconnecting...", file=sys.stderr)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current load balancer resets the connection every 3m, which means this log is going to stick out like a sore thumb in the average case.

Can we only print this message if we have repeat failures? ex. failures_without_progress > 1

That way a successful retry is hidden under the hood?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already addressed on this branch. The reconnect message is gated on failures_without_progress > 1 at src/kaggle/api/kaggle_api_extended.py:5407, so the routine ~3-minute LB reset (single failure, then successful retry) stays silent and only persistent trouble surfaces. Tests in tests/test_kernels_logs.py cover both cases: test_kernels_logs_cli_follow_reconnects_and_dedupes asserts stderr is empty after a single drop, and test_kernels_logs_cli_follow_reports_only_repeat_failures asserts the warning appears exactly once on the second consecutive failure. All 21 tests pass.

time.sleep(self._LOG_STREAM_RECONNECT_DELAY_SEC)

def model_get(self, model: str) -> ApiModel:
"""Gets a model.
Expand Down
4 changes: 2 additions & 2 deletions src/kaggle/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,8 +2284,8 @@ class Help(object):
"Regex pattern to match against filenames. Only files matching the pattern will be downloaded."
)
param_kernel_acc = "Specify the type of accelerator to use for the kernel run"
param_kernel_logs_follow = "Continuously poll and print new log lines (like tail -f)"
param_kernel_logs_interval = "Polling interval in seconds for follow mode (default 5)"
param_kernel_logs_follow = "Stream live execution logs from the running session (like tail -f)"
param_kernel_logs_interval = argparse.SUPPRESS # Deprecated; live streaming is push-based.

# Models params
param_model = "Model URL suffix in format <owner>/<model-name>"
Expand Down
Loading
Loading