Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ dynamic = ["version"]
readme = "README.md"
dependencies = [
"torch>=2.7",
"opentelemetry-exporter-otlp-proto-http>=1.37.0",
"opentelemetry-sdk>=1.37.0",
"opentelemetry-api>=1.37.0",
"opentelemetry-exporter-otlp-proto-http>=1.39.0",
"opentelemetry-sdk>=1.39.0",
"opentelemetry-api>=1.39.0",
]

[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion torchft/optim.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ def param_groups(self) -> List[Dict[str, Any]]:
return self.optim.param_groups

@property
def state(self) -> Mapping[torch.Tensor, Any]: # pyre-fixme[3]
def state(self) -> Mapping[torch.Tensor, object]:
return self.optim.state
40 changes: 27 additions & 13 deletions torchft/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,53 @@
import logging
import os
import time
from typing import List, Sequence
from typing import Any, List, Sequence, TYPE_CHECKING

from opentelemetry._logs import set_logger_provider

from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs._internal import LogData
from opentelemetry.sdk._logs.export import (
BatchLogRecordProcessor,
ConsoleLogExporter,
LogExporter,
)
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource

# These types are available in opentelemetry-sdk but Pyre's type stubs
# don't include them. We import them at runtime and provide type aliases for
# static type checking.
if TYPE_CHECKING:
# pyre-fixme[33]: Aliasing to Any is prohibited. opentelemetry-sdk lacks type stubs.
ReadableLogRecord = Any
# pyre-fixme[33]: Aliasing to Any is prohibited. opentelemetry-sdk lacks type stubs.
LogRecordExporter = Any
# pyre-fixme[33]: Aliasing to Any is prohibited. opentelemetry-sdk lacks type stubs.
LogRecordExportResult = Any
# pyre-fixme[33]: Aliasing to Any is prohibited. opentelemetry-sdk lacks type stubs.
ConsoleLogRecordExporter = Any
else:
from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs.export import (
ConsoleLogRecordExporter,
LogRecordExporter,
LogRecordExportResult,
)

_LOGGER_PROVIDER: dict[str, LoggerProvider] = {}
# Path to the file containing OTEL resource attributes
TORCHFT_OTEL_RESOURCE_ATTRIBUTES_JSON = "TORCHFT_OTEL_RESOURCE_ATTRIBUTES_JSON"


class TeeLogExporter(LogExporter):
class TeeLogExporter(LogRecordExporter):
"""Exporter that writes to multiple exporters."""

def __init__(
self,
exporters: List[LogExporter],
exporters: List[LogRecordExporter],
) -> None:
self._exporters = exporters

def export(self, batch: Sequence[LogData]) -> None:
def export(self, batch: Sequence[ReadableLogRecord]) -> LogRecordExportResult:
for e in self._exporters:
e.export(batch)
return LogRecordExportResult.SUCCESS

def shutdown(self) -> None:
for e in self._exporters:
Expand All @@ -49,8 +65,6 @@ def setup_logger(name: str) -> None:
if os.environ.get("TORCHFT_USE_OTEL", "false") == "false":
return

global _LOGGER_PROVIDER

if name in _LOGGER_PROVIDER:
return

Expand All @@ -70,7 +84,7 @@ def setup_logger(name: str) -> None:

exporter = TeeLogExporter(
exporters=[
ConsoleLogExporter(),
ConsoleLogRecordExporter(),
OTLPLogExporter(
timeout=5,
),
Expand Down
7 changes: 5 additions & 2 deletions torchft/process_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _test_pg(
]
tensor_list = [torch.empty_like(input_tensor)]

def check_tensors(arg: Any) -> None: # pyre-ignore[2]
def check_tensors(arg: object) -> None:
"""Recursively check tensors for expected shape and dtype."""
if isinstance(arg, torch.Tensor):
assert arg.dtype == dtype, f"Output dtype mismatch: {arg.dtype} != {dtype}"
Expand Down Expand Up @@ -738,7 +738,10 @@ def test_functional_collectives(self) -> None:

self.assertEqual(pg.group_name, str(dist.get_pg_count() - 1))

self.assertIs(_resolve_process_group(pg.group_name), pg)
self.assertIs(
_resolve_process_group(pg.group_name), # pyre-ignore[6]: GroupName vs str
pg,
)

try:
t = torch.zeros(10)
Expand Down
Loading