Skip to content

Commit

Permalink
chore: replace async client with synchronous client and enhance data …
Browse files Browse the repository at this point in the history
…processing
  • Loading branch information
guptadev21 committed Jan 30, 2025
1 parent fd25713 commit 7e70739
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions settings/source.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import asyncio
import json
import ast
from typing import Any, Type, Dict
from benedict import benedict
from munch import Munch

import yaml
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource

from rapyuta_io_sdk_v2 import AsyncClient, Configuration
from rapyuta_io_sdk_v2 import Client, Configuration
import base64


class ConfigTreeSource(PydanticBaseSettingsSource):
Expand All @@ -20,7 +21,7 @@ def __init__(
local_file: str = None,
):
super().__init__(settings_cls)
self.async_client = AsyncClient(config=config)
self.client = Client(config=config)
self.tree_name = tree_name
self.local_file = local_file
self._top_prefix = key_prefix
Expand All @@ -33,16 +34,18 @@ def __init__(

processed_data = self._process_config_tree(raw_data=self.config_tree)

if processed_data is None:
raise ValueError("processed_data cannot be None")

self._configtree_data = benedict(processed_data).unflatten(separator="/")
# print(self._configtree_data)

# * Methods to fetch Configtree
async def fetch_from_api(self):
def fetch_from_api(self):
"""
Load the configuration tree from an external API.
"""
try:
response = await self.async_client.get_configtree(
response = self.client.get_configtree(
name=self.tree_name,
include_data=True,
content_types="kv",
Expand All @@ -53,7 +56,6 @@ async def fetch_from_api(self):
except Exception as e:
raise ValueError(f"Failed to fetch configuration tree from API: {e}")

# TODO: Check remaining for this
def load_from_local_file(self):
"""
Load the configuration tree from a local JSON or YAML file.
Expand All @@ -79,18 +81,28 @@ def load_from_local_file(self):
raise ValueError(f"Failed to load configuration tree from file: {e}")

def load_config_tree(self):
"""
Load the configuration tree from either an API or a local file.
"""
if self.local_file:
self.load_from_local_file()

else:
asyncio.run(self.fetch_from_api())
self.fetch_from_api()

# * Methods to process the tree

def _extract_data_api(self, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
return {key: value.get("data") for key, value in input_data.items() if "data" in value}
return {
key: self._decode_and_convert(value.get("data"))
for key, value in input_data.items()
if "data" in value
}

def _decode_and_convert(self, encoded_data: str) -> Any:
decoded_data = base64.b64decode(encoded_data).decode("utf-8")

try:
# Safely evaluate the decoded string to Python data structures (e.g., lists, dicts)
return ast.literal_eval(decoded_data)
except (ValueError, SyntaxError):
return decoded_data

def _extract_data_local(self, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
for key, value in input_data.items():
Expand Down

0 comments on commit 7e70739

Please sign in to comment.