Skip to content
Merged
200 changes: 162 additions & 38 deletions src/kaggle/api/kaggle_api_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
ApiDeleteKernelRequest,
ApiGetAcceleratorQuotaStatisticsRequest,
)
from kagglesdk.kernels.types.kernels_enums import KernelWorkerStatus, KernelsListSortType, KernelsListViewType
from kagglesdk.kernels.types.kernels_enums import KernelsListSortType, KernelsListViewType
from kagglesdk.models.types.model_api_service import (
ApiListModelsRequest,
ApiCreateModelRequest,
Expand Down Expand Up @@ -216,7 +216,7 @@
from enum import EnumMeta
from requests.exceptions import HTTPError
from requests.models import Response
from typing import Callable, cast, Dict, List, Mapping, Optional, Tuple, Union, TypeVar, Iterable
from typing import Callable, cast, Dict, Iterator, List, Mapping, Optional, Tuple, Union, TypeVar, Iterable

T = TypeVar("T")

Expand Down Expand Up @@ -5255,52 +5255,176 @@ def kernels_logs(self, kernel: str) -> str:
raise
return response.log or ""

def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=5):
def _split_kernel(self, kernel: str) -> Tuple[str, str]:
"""Split a kernel identifier into (owner_slug, kernel_slug)."""
if kernel is None:
raise ValueError("A kernel must be specified")
if "/" in kernel:
self.validate_kernel_string(kernel)
owner_slug, kernel_slug = kernel.split("/", 1)
return owner_slug, kernel_slug
owner_slug = self.get_config_value(self.CONFIG_NAME_USER) or ""
return owner_slug, kernel

# Sentinel value emitted by the streaming endpoint to signal end-of-stream.
_LOG_STREAM_END_SENTINEL = "END_OF_LOG"

def kernels_logs_stream(self, kernel: str) -> Iterator[Dict[str, str]]:
"""Stream execution logs for a kernel via the midtier logs endpoint.

`GET /api/v1/kernels/logs/stream/{owner}/{slug}` adapts to the session
state: while the session is running it proxies the upstream SSE feed
(`Content-Type: text/event-stream`, JSON `{stream_name, time, data}`
events terminated by an `END_OF_LOG` sentinel); once the session is
done it returns the persisted log blob from GCS with a non-SSE
content type. We branch on `Content-Type` and yield uniform
`{"data": ...}` events either way.

Args:
kernel: The kernel identifier in the format owner/kernel-slug.

Yields:
Dict[str, str]: Parsed event payloads.
"""
owner_slug, kernel_slug = self._split_kernel(kernel)

with self.build_kaggle_client() as kaggle:
http = kaggle._http_client
http._init_session()
base = http._endpoint if http._env == KaggleEnv.PROD else f"{http._endpoint}/api"
url = f"{base}/v1/kernels/logs/stream/{owner_slug}/{kernel_slug}"

headers = dict(http._session.headers)
headers["Accept"] = "text/event-stream, */*"
headers.pop("Content-Type", None)

try:
response = http._session.get(url, stream=True, headers=headers, auth=http._session.auth)
response.raise_for_status()
except HTTPError as e:
if e.response is not None and e.response.status_code in (401, 403):
raise ValueError(
f"Cannot stream logs for kernel '{kernel}' "
"(Permission 'kernels.get' was denied). "
"The most likely cause is a wrong kernel slug. "
"Use the slug from the notebook URL (kaggle.com/code/owner/KERNEL-SLUG)."
)
raise

try:
content_type = (response.headers.get("Content-Type") or "").lower()
if content_type.startswith("text/event-stream"):
yield from self._iter_sse_events(response)
else:
yield from self._iter_blob_lines(response)
finally:
response.close()

def _iter_sse_events(self, response) -> Iterator[Dict[str, str]]:
"""Parse `data:` lines from a live SSE response, stopping on the sentinel."""
for raw_line in response.iter_lines(decode_unicode=True):
if not raw_line or not raw_line.startswith("data:"):
continue
payload = raw_line[len("data:") :].lstrip()
if payload == self._LOG_STREAM_END_SENTINEL:
return
try:
yield json.loads(payload)
except json.JSONDecodeError:
yield {"data": payload}

def _iter_blob_lines(self, response) -> Iterator[Dict[str, str]]:
"""Yield events from a non-SSE blob (completed session fallback).

The midtier serves the persisted GCS log as a JSON array of
`{stream_name, time, data}` objects — the same shape as live SSE
events. Parsing and yielding each entry lets the CLI render
completed-session output the same way as a live stream (one
`data` value per line) instead of dumping raw JSON. Unknown
blob formats fall back to line-by-line so callers still see
something readable.
"""
body = response.text
try:
payload = json.loads(body)
except (json.JSONDecodeError, ValueError):
for line in body.splitlines():
if line:
yield {"data": line}
return

events = payload if isinstance(payload, list) else [payload]
for event in events:
if isinstance(event, dict):
yield event

# `--follow` reconnects on transient network drops (e.g. load-balancer
# idle timeouts). On reconnect the server replays the stream from the
# beginning, so we dedup by counting events handled so far.
_LOG_STREAM_MAX_FAILURES = 5
_LOG_STREAM_RECONNECT_DELAY_SEC = 1

def kernels_logs_cli(self, kernel, kernel_opt=None, follow=False, interval=None):
"""Print kernel execution logs to stdout.

In one-shot mode (default) prints the persisted log blob for the
kernel's latest session. In `--follow` mode attaches to the midtier
SSE log stream and prints log lines as they are produced by the
running session, exiting when the server signals end-of-stream.
Transient connection drops are retried transparently; the server
replays from the beginning on reconnect and already-seen events
are skipped.

Args:
kernel: The kernel for which to retrieve the logs.
kernel_opt: An alternative option to providing a kernel.
follow: If True, continuously poll and print new log lines.
interval: Polling interval in seconds for follow mode (default 5).
follow: If True, attach to the live log stream.
interval: Deprecated; retained for CLI backwards compatibility.
"""
del interval # No longer used; live streaming is push-based.
kernel = kernel or kernel_opt
terminal_statuses = {
KernelWorkerStatus.COMPLETE,
KernelWorkerStatus.ERROR,
KernelWorkerStatus.CANCEL_ACKNOWLEDGED,
}
printed_lines = 0

while True:
log = self.kernels_logs(kernel)
lines = log.split("\n") if log else []
if not follow:
print(self.kernels_logs(kernel))
return

if follow:
new_lines = lines[printed_lines:]
if new_lines:
print("\n".join(new_lines), flush=True)
printed_lines = len(lines)
seen_count = 0
failures_without_progress = 0

# Check if the kernel has reached a terminal status
try:
status_response = self.kernels_status(kernel)
status = status_response.status
except Exception:
break
if status in terminal_statuses:
# Fetch final logs one more time
log = self.kernels_logs(kernel)
lines = log.split("\n") if log else []
final_new_lines = lines[printed_lines:]
if final_new_lines:
print("\n".join(final_new_lines), flush=True)
break

time.sleep(interval)
else:
print(log)
break
while True:
seen_before = seen_count
try:
for index, event in enumerate(self.kernels_logs_stream(kernel)):
if index < seen_count:
continue
seen_count = index + 1
data = event.get("data")
if data is None:
continue
print(data, flush=True, end="" if data.endswith("\n") else "\n")
return
except (
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError,
urllib3_exceptions.ProtocolError,
):
if seen_count == seen_before:
failures_without_progress += 1
else:
failures_without_progress = 0
if failures_without_progress >= self._LOG_STREAM_MAX_FAILURES:
print(
f"Log stream connection failed {self._LOG_STREAM_MAX_FAILURES} "
"times with no new data; giving up.",
file=sys.stderr,
)
return
# Stay quiet on the first reconnect — the load balancer cuts
# idle SSE connections every few minutes, so a single retry is
# the common case and shouldn't be reported as noise.
if failures_without_progress > 1:
print("Log stream connection lost, reconnecting...", file=sys.stderr)
time.sleep(self._LOG_STREAM_RECONNECT_DELAY_SEC)

def model_get(self, model: str) -> ApiModel:
"""Gets a model.
Expand Down
4 changes: 2 additions & 2 deletions src/kaggle/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,8 +2284,8 @@ class Help(object):
"Regex pattern to match against filenames. Only files matching the pattern will be downloaded."
)
param_kernel_acc = "Specify the type of accelerator to use for the kernel run"
param_kernel_logs_follow = "Continuously poll and print new log lines (like tail -f)"
param_kernel_logs_interval = "Polling interval in seconds for follow mode (default 5)"
param_kernel_logs_follow = "Stream live execution logs from the running session (like tail -f)"
param_kernel_logs_interval = argparse.SUPPRESS # Deprecated; live streaming is push-based.

# Models params
param_model = "Model URL suffix in format <owner>/<model-name>"
Expand Down
Loading
Loading