Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions rock/cli/command/model_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import argparse
import logging
import sys
from pathlib import Path

from rock.cli.command.command import Command
from rock.sdk.model.client import ModelClient
from rock.sdk.model.service import ModelService

logger = logging.getLogger(__name__)


class ModelServiceCommand(Command):
name = "model-service"

DEFAULT_MODEL_SERVICE_DIR = "data/cli/model"
DEFAULT_MODEL_SERVICE_PID_FILE = DEFAULT_MODEL_SERVICE_DIR + "/pid.txt"

async def arun(self, args: argparse.Namespace):
if not Path(self.DEFAULT_MODEL_SERVICE_DIR).exists():
Path(self.DEFAULT_MODEL_SERVICE_DIR).mkdir(parents=True, exist_ok=True)

sub_command = args.model_service_command
if "start" == sub_command:
if await self._model_service_exist():
logger.error("model service already exist, please run 'rock model-service stop' first")
sys.exit(1)
return
logger.info("start model service")
model_service = ModelService()
pid = await model_service.start()
logger.info(f"model service started, pid: {pid}")
with open(self.DEFAULT_MODEL_SERVICE_PID_FILE, "w") as f:
f.write(pid)
return
if "watch-agent" == sub_command:
agent_pid = args.pid
logger.info(f"start to watch agent process, pid: {agent_pid}")
model_service = ModelService()
await model_service.start_watch_agent(agent_pid)
return
if "stop" == sub_command:
if not await self._model_service_exist():
logger.info("model service not exist, skip")
return
logger.info("start to stop model service")
with open(self.DEFAULT_MODEL_SERVICE_PID_FILE) as f:
pid = f.read()
model_service = ModelService()
await model_service.stop(pid)
Path(self.DEFAULT_MODEL_SERVICE_PID_FILE).unlink()
logger.info("model service stopped")
return
if "anti-call-llm" == sub_command:
logger.debug("start to anti call llm")
model_client = ModelClient()
next_request = await model_client.anti_call_llm(index=args.index, last_response=args.response)
# necessary: print next_request to stdout, and do NOT print anything else
print(next_request)

async def _model_service_exist(self) -> bool:
exist = Path(self.DEFAULT_MODEL_SERVICE_PID_FILE).exists()
if exist:
with open(self.DEFAULT_MODEL_SERVICE_PID_FILE) as f:
pid = f.read()
logger.info(f"model service exist, pid: {pid}.")
return exist

@staticmethod
async def add_parser_to(subparsers: argparse._SubParsersAction):
model_service_parser = subparsers.add_parser(
"model-service",
description="model-service command",
)
model_service_subparsers = model_service_parser.add_subparsers(
dest="model_service_command",
)

# rock model-service start
model_service_subparsers.add_parser(
"start",
help="start model service",
)

watch_agent_parser = model_service_subparsers.add_parser(
"watch-agent",
help="watch agent status, if stopped, send SESSION_END",
)
watch_agent_parser.add_argument(
"--pid",
required=True,
type=int,
help="pid of agent process to watch",
)

# rock model-service stop
model_service_subparsers.add_parser(
"stop",
help="stop model service",
)

# rock model-service anti-call-llm --index N [--response RESPONSE]
anti_call_llm_parser = model_service_subparsers.add_parser(
"anti-call-llm",
help="anti call llm, input is response of llm, output is the next request to llm",
)
anti_call_llm_parser.add_argument(
"--index", required=True, type=int, help="index of last llm call, start from 0"
)
anti_call_llm_parser.add_argument("--response", required=False, help="response of last llm call")
4 changes: 4 additions & 0 deletions rock/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
ROCK_CLI_LOAD_PATHS: str = str(Path(__file__).parent / "cli" / "command")
ROCK_CLI_DEFAULT_CONFIG_PATH: str

# Model Service Config
ROCK_MODEL_SERVICE_DATA_DIR: str


environment_variables: dict[str, Callable[[], Any]] = {
"ROCK_LOGGING_PATH": lambda: os.getenv("ROCK_LOGGING_PATH"),
Expand Down Expand Up @@ -70,6 +73,7 @@
"ROCK_CLI_DEFAULT_CONFIG_PATH": lambda: os.getenv(
"ROCK_CLI_DEFAULT_CONFIG_PATH", Path.home() / ".rock" / "config.ini"
),
"ROCK_MODEL_SERVICE_DATA_DIR": lambda: os.getenv("ROCK_MODEL_SERVICE_DATA_DIR", "/data/logs"),
}


Expand Down
131 changes: 131 additions & 0 deletions rock/sdk/model/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import json
import logging
import time
from pathlib import Path

from rock.sdk.model.server.config import (
LOG_FILE,
REQUEST_END_MARKER,
REQUEST_START_MARKER,
RESPONSE_END_MARKER,
RESPONSE_START_MARKER,
SESSION_END_MARKER,
)

logger = logging.getLogger(__name__)


class ModelClient:
def __init__(self, log_file_name: str | None = None):
if log_file_name is not None:
self.log_file = log_file_name
else:
self.log_file = LOG_FILE

async def anti_call_llm(self, index: int, last_response: str | None = None) -> str:
"""anti call llm, input is response of llm, output is the next request to llm"""
if index < 0:
raise ValueError("index must be greater than 0")

if 0 == index:
if last_response is not None:
raise ValueError("last_response must be None when index is 0")
await self.wait_for_first_request()
return await self.pop_request(index + 1)

if last_response is None:
raise ValueError("last_response must not be None when index is greater than 0")
await self.push_response(index=index, last_response=last_response)
return await self.pop_request(index + 1)

async def push_response(self, index: int, last_response: str):
content = await self._construct_response(last_response, index)
last_response_line = await self.read_last_response_line()
if last_response_line is None:
await self._append_response(content)
return
response_json, meta = await self.parse_response_line(last_response_line)
last_response_index: int = int(meta.get("index")) # type: ignore
if index < last_response_index:
raise ValueError(f"index {index} must not be smaller than last_response_index {last_response_index}")
if index == last_response_index:
logger.debug(f"response index {index} already exists, skip. content is {last_response_line}")
return
await self._append_response(content)

async def _append_response(self, content: str):
with open(self.log_file, "a") as f:
f.write(content)

async def pop_request(self, index: int) -> str:
while True:
last_request_line = await self.read_last_request_line()
request_json, meta = await self.parse_request_line(last_request_line)
if SESSION_END_MARKER == request_json:
return SESSION_END_MARKER
if meta.get("index") == index:
return request_json
logger.debug(f"Last request {last_request_line} is not the index {index} we want, waiting...")
await asyncio.sleep(1)

async def parse_request_line(self, line_content: str) -> tuple[str, dict]:
if SESSION_END_MARKER in line_content:
return SESSION_END_MARKER, {}

meta_json = line_content.split(REQUEST_END_MARKER)[1]
request_json = line_content.split(REQUEST_END_MARKER)[0].split(REQUEST_START_MARKER)[1]
meta = json.loads(meta_json)
return request_json, meta

async def parse_response_line(self, line_content: str) -> tuple[str, dict]:
meta_json = line_content.split(RESPONSE_END_MARKER)[1]
response_json = line_content.split(RESPONSE_END_MARKER)[0].split(RESPONSE_START_MARKER)[1]
meta = json.loads(meta_json)
return response_json, meta

async def read_last_request_line(self) -> str:
with open(self.log_file) as f:
lines = f.readlines()
line_index = len(lines) - 1
while line_index >= 0:
line = lines[line_index]
if REQUEST_START_MARKER in line or SESSION_END_MARKER in line:
return line
line_index -= 1
raise ValueError(f"No request found in log file {self.log_file}")

async def read_last_response_line(self) -> str | None:
with open(self.log_file) as f:
lines = f.readlines()
line_index = len(lines) - 1
while line_index >= 0:
line = lines[line_index]
if RESPONSE_START_MARKER in line:
return line
line_index -= 1
return None

async def wait_for_first_request(self):
while True:
if not Path(self.log_file).exists():
logger.debug(f"Log file {self.log_file} not found, waiting...")
await asyncio.sleep(1)
continue
with open(self.log_file) as f:
lines = f.readlines()
if len(lines) == 0:
logger.debug(f"Log file {self.log_file} is empty, waiting for the first request...")
await asyncio.sleep(1)
continue
else:
return

async def _construct_response(self, last_response: str, index: int) -> str:
meta = {
"timestamp": int(time.time() * 1000),
"index": index,
}
meta_json = json.dumps(meta, ensure_ascii=False)
content = f"{RESPONSE_START_MARKER}{last_response}{RESPONSE_END_MARKER}{meta_json}\n"
return content
22 changes: 22 additions & 0 deletions rock/sdk/model/server/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from rock import env_vars

"""Configuration for LLM Service."""

# Service configuration
SERVICE_HOST = "0.0.0.0"
SERVICE_PORT = 8080

# Log file configuration
LOG_DIR = env_vars.ROCK_MODEL_SERVICE_DATA_DIR
LOG_FILE = LOG_DIR + "/LLMService.log"

# Polling configuration
POLLING_INTERVAL_SECONDS = 0.1 # seconds
REQUEST_TIMEOUT = None # Infinite timeout as requested

# Request markers
REQUEST_START_MARKER = "LLM_REQUEST_START"
REQUEST_END_MARKER = "LLM_REQUEST_END"
RESPONSE_START_MARKER = "LLM_RESPONSE_START"
RESPONSE_END_MARKER = "LLM_RESPONSE_END"
SESSION_END_MARKER = "SESSION_END"
Loading
Loading