Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@

### New Features and Improvements

- Added `HeadersProvider` abstraction for flexible authentication strategies
- Implemented `OAuthHeadersProvider` for OAuth 2.0 Client Credentials flow (default authentication method used by `create_stream()`)

### Bug Fixes

### Documentation

- Added Azure workspace and endpoint URL examples

### Internal Changes

### API Changes

- Added `HeadersProvider` abstract base class for custom header strategies
- Added `OAuthHeadersProvider` class for OAuth 2.0 authentication with Databricks OIDC endpoint
Comment on lines 18 to +21
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the create_stream_with_headers_provider method here too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

151 changes: 149 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The Databricks Zerobus Ingest SDK for Python provides a high-performance client
- [Usage Examples](#usage-examples)
- [Blocking Ingestion](#blocking-ingestion)
- [Non-Blocking Ingestion](#non-blocking-ingestion)
- [Authentication](#authentication)
- [Configuration](#configuration)
- [Error Handling](#error-handling)
- [API Reference](#api-reference)
Expand Down Expand Up @@ -479,6 +480,71 @@ async def main():
asyncio.run(main())
```

## Authentication

The SDK uses OAuth 2.0 Client Credentials for authentication:

```python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2

sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)

# Create stream with OAuth authentication
stream = sdk.create_stream(client_id, client_secret, table_properties)
```

The SDK automatically fetches access tokens and includes these headers:
- `"authorization": "Bearer <oauth_token>"` - Obtained via OAuth 2.0 Client Credentials flow
- `"x-databricks-zerobus-table-name": "<table_name>"` - The fully qualified table name

### Advanced: Custom Headers

For advanced use cases where you need to provide custom headers (e.g., for future authentication methods or additional metadata), you can implement a custom `HeadersProvider`:

```python
from zerobus.sdk.shared.headers_provider import HeadersProvider

class CustomHeadersProvider(HeadersProvider):
"""
Custom headers provider for advanced use cases.

Note: Currently, OAuth 2.0 Client Credentials (via create_stream())
is the standard authentication method. Use this only if you have
specific requirements for custom headers.
"""

def __init__(self, token: str):
self.token = token

def get_headers(self):
"""
Return headers for gRPC metadata.

Returns:
List of (header_name, header_value) tuples
"""
return [
("authorization", f"Bearer {self.token}"),
("x-custom-header", "custom-value"),
]

# Use the custom provider
custom_provider = CustomHeadersProvider(token="your-token")
stream = sdk.create_stream_with_headers_provider(
custom_provider,
table_properties
)
```

**Potential use cases for custom headers:**
- Integration with existing token management systems
- Additional metadata headers for request tracking
- Future authentication methods
- Special routing or service mesh requirements

## Configuration

### Stream Configuration Options
Expand Down Expand Up @@ -565,7 +631,22 @@ def create_stream(
options: StreamConfigurationOptions = None
) -> ZerobusStream
```
Creates a new ingestion stream. Returns a `ZerobusStream` instance.
Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should mention that you also have to align with existing methods of authentication. Because I can already see a situation where someone will override this and not include needed authentication headers for Zerobus and complain.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Automatically includes these headers:
- `"authorization": "Bearer <oauth_token>"` (fetched via OAuth 2.0 Client Credentials flow)
- `"x-databricks-zerobus-table-name": "<table_name>"`

Returns a `ZerobusStream` instance.

```python
def create_stream_with_headers_provider(
headers_provider: HeadersProvider,
table_properties: TableProperties,
options: StreamConfigurationOptions = None
) -> ZerobusStream
```
Creates a new ingestion stream using a custom headers provider. For advanced use cases only where custom headers are required. Returns a `ZerobusStream` instance.

---

Expand All @@ -586,7 +667,22 @@ async def create_stream(
options: StreamConfigurationOptions = None
) -> ZerobusStream
```
Creates a new ingestion stream. Returns a `ZerobusStream` instance.
Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.

Automatically includes these headers:
- `"authorization": "Bearer <oauth_token>"` (fetched via OAuth 2.0 Client Credentials flow)
- `"x-databricks-zerobus-table-name": "<table_name>"`

Returns a `ZerobusStream` instance.

```python
async def create_stream_with_headers_provider(
headers_provider: HeadersProvider,
table_properties: TableProperties,
options: StreamConfigurationOptions = None
) -> ZerobusStream
```
Creates a new ingestion stream using a custom headers provider. For advanced use cases only where custom headers are required. Returns a `ZerobusStream` instance.

---

Expand Down Expand Up @@ -683,6 +779,57 @@ Returns the protobuf message descriptor.

---

### HeadersProvider

Abstract base class for providing headers to gRPC streams. For advanced use cases only.

**Abstract Method:**

```python
@abstractmethod
def get_headers(self) -> List[Tuple[str, str]]
```
Returns headers for gRPC metadata as a list of (header_name, header_value) tuples.

**Built-in Implementation:**

#### OAuthHeadersProvider

OAuth 2.0 Client Credentials flow headers provider (used internally by `create_stream()`).

```python
OAuthHeadersProvider(
workspace_id: str,
workspace_url: str,
table_name: str,
client_id: str,
client_secret: str
)
```

Returns these headers:
- `"authorization": "Bearer <oauth_token>"` (fetched via OAuth 2.0)
- `"x-databricks-zerobus-table-name": "<table_name>"`

**Custom Implementation (Advanced):**

For advanced use cases requiring custom headers, extend the `HeadersProvider` class:

```python
from zerobus.sdk.shared.headers_provider import HeadersProvider

class MyCustomProvider(HeadersProvider):
def get_headers(self):
return [
("authorization", "Bearer my-token"),
("x-custom-header", "value"),
]
```

Note: Most users should use `create_stream()` with OAuth credentials rather than implementing a custom provider.

---

### StreamConfigurationOptions

Configuration options for stream behavior.
Expand Down
43 changes: 42 additions & 1 deletion examples/async_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
Use Case: Best for applications already using asyncio, async web frameworks (FastAPI, aiohttp),
or when integrating ingestion with other asynchronous operations in an event loop.

Authentication:
- Uses OAuth 2.0 Client Credentials (standard method)
- Includes example of custom headers provider for advanced use cases

Note: Both sync and async APIs provide the same throughput and durability guarantees.
Choose based on your application's architecture, not performance requirements.
"""
Expand All @@ -20,6 +24,7 @@

from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import StreamConfigurationOptions, TableProperties
from zerobus.sdk.shared.headers_provider import HeadersProvider

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
Expand Down Expand Up @@ -56,6 +61,31 @@ def create_sample_record(index):
return record_pb2.AirQuality(device_name=f"sensor-{index % 10}", temp=20 + (index % 15), humidity=50 + (index % 40))


class CustomHeadersProvider(HeadersProvider):
"""
Example custom headers provider for advanced use cases.

Note: OAuth 2.0 Client Credentials (via create_stream()) is the standard
authentication method. Use this only if you have specific requirements
for custom headers (e.g., custom metadata, existing token management, etc.).
"""

def __init__(self, custom_token: str):
self.custom_token = custom_token

def get_headers(self):
"""
Return custom headers for gRPC metadata.

Returns:
List of (header_name, header_value) tuples
"""
return [
("authorization", f"Bearer {self.custom_token}"),
("x-custom-header", "custom-value"),
]


def create_ack_callback():
"""
Creates an acknowledgment callback that logs progress.
Expand Down Expand Up @@ -111,8 +141,19 @@ async def main():
table_properties = TableProperties(TABLE_NAME, record_pb2.AirQuality.DESCRIPTOR)
logger.info(f"✓ Table properties configured for: {TABLE_NAME}")

# Step 4: Create a stream
# Step 4: Create a stream with OAuth 2.0 authentication
#
# Standard method: OAuth 2.0 Client Credentials
# The SDK automatically includes these headers:
# - "authorization": "Bearer <oauth_token>" (fetched via OAuth 2.0 Client Credentials flow)
# - "x-databricks-zerobus-table-name": "<table_name>"
stream = await sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

# Advanced: Custom headers provider (for special use cases only)
# Uncomment to use custom headers instead of OAuth:
# custom_provider = CustomHeadersProvider(custom_token="your-custom-token")
# stream = await sdk.create_stream_with_headers_provider(custom_provider, table_properties, options)

logger.info(f"✓ Stream created: {stream.stream_id}")

# Step 5: Ingest records asynchronously
Expand Down
43 changes: 42 additions & 1 deletion examples/sync_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

Use Case: Best for applications that don't use asyncio or prefer blocking I/O patterns.

Authentication:
- Uses OAuth 2.0 Client Credentials (standard method)
- Includes example of custom headers provider for advanced use cases

Note: Both sync and async APIs provide the same throughput and durability guarantees.
Choose based on your application's architecture, not performance requirements.
"""
Expand All @@ -17,6 +21,7 @@
import record_pb2

from zerobus.sdk.shared import StreamConfigurationOptions, TableProperties
from zerobus.sdk.shared.headers_provider import HeadersProvider
from zerobus.sdk.sync import ZerobusSdk

# Configure logging
Expand Down Expand Up @@ -54,6 +59,31 @@ def create_sample_record(index):
return record_pb2.AirQuality(device_name=f"sensor-{index % 10}", temp=20 + (index % 15), humidity=50 + (index % 40))


class CustomHeadersProvider(HeadersProvider):
"""
Example custom headers provider for advanced use cases.

Note: OAuth 2.0 Client Credentials (via create_stream()) is the standard
authentication method. Use this only if you have specific requirements
for custom headers (e.g., custom metadata, existing token management, etc.).
"""

def __init__(self, custom_token: str):
self.custom_token = custom_token

def get_headers(self):
"""
Return custom headers for gRPC metadata.

Returns:
List of (header_name, header_value) tuples
"""
return [
("authorization", f"Bearer {self.custom_token}"),
("x-custom-header", "custom-value"),
]


def main():
print("Starting synchronous ingestion example...")
print("=" * 60)
Expand Down Expand Up @@ -93,8 +123,19 @@ def main():
)
logger.info("✓ Stream configuration created")

# Step 4: Create a stream
# Step 4: Create a stream with OAuth 2.0 authentication
#
# Standard method: OAuth 2.0 Client Credentials
# The SDK automatically includes these headers:
# - "authorization": "Bearer <oauth_token>" (fetched via OAuth 2.0 Client Credentials flow)
# - "x-databricks-zerobus-table-name": "<table_name>"
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

# Advanced: Custom headers provider (for special use cases only)
# Uncomment to use custom headers instead of OAuth:
# custom_provider = CustomHeadersProvider(custom_token="your-custom-token")
# stream = sdk.create_stream_with_headers_provider(custom_provider, table_properties, options)

logger.info(f"✓ Stream created: {stream.stream_id}")

# Step 5: Ingest records synchronously
Expand Down
4 changes: 2 additions & 2 deletions tests/mock_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ async def wrapper(test_instance, *args, **kwargs):
# Run for async SDK
logging.info(f"\nRunning test '{test_func.__name__}' for Async SDK")
async_sdk_manager = SdkManager(ZerobusSdkAsync)
with patch("zerobus.sdk.aio.zerobus_sdk.get_zerobus_token", return_value="mock_token"):
with patch("zerobus.sdk.shared.headers_provider.get_zerobus_token", return_value="mock_token"):
await test_func(test_instance, async_sdk_manager, *args, **kwargs)
logging.info(f"Test '{test_func.__name__}' PASSED for Async SDK")

Expand All @@ -482,7 +482,7 @@ async def wrapper(test_instance, *args, **kwargs):
loop = asyncio.get_running_loop()

def run_sync_test_in_isolation():
with patch("zerobus.sdk.sync.zerobus_sdk.get_zerobus_token", return_value="mock_token"):
with patch("zerobus.sdk.shared.headers_provider.get_zerobus_token", return_value="mock_token"):
asyncio.run(coro)

await loop.run_in_executor(None, run_sync_test_in_isolation)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def create_ephemeral_stream(generator, **kwargs):

with (
patch("grpc.secure_channel", return_value=mock_channel),
patch("zerobus.sdk.sync.zerobus_sdk.get_zerobus_token", return_value="mock_token"),
patch("zerobus.sdk.shared.headers_provider.get_zerobus_token", return_value="mock_token"),
):
sdk_handle = ZerobusSdk(SERVER_ENDPOINT, unity_catalog_url="https://test.unity.catalog.url")
try:
Expand Down
Loading