diff --git a/docs/overrides/home.html b/docs/overrides/home.html index 2ead15529..601290c3c 100644 --- a/docs/overrides/home.html +++ b/docs/overrides/home.html @@ -563,10 +563,10 @@

Streaming Messages

🐍 {% if config.theme.language == "zh" %}

Python 优先

-

通过 PyO3 提供完整的 Python API。使用 @as_actor 装饰器将任何类转换为分布式 Actor。

+

通过 PyO3 提供完整的 Python API。使用 @remote 装饰器将任何类转换为分布式 Actor。

{% else %}

Python First

-

Full Python API via PyO3. Use the @as_actor decorator to turn any class into a distributed actor.

+

Full Python API via PyO3. Use the @remote decorator to turn any class into a distributed actor.

{% endif %}
@@ -607,9 +607,9 @@

{% if config.theme.language == "zh" %}安装 Pulsing{% else %}Install Pulsin

{% if config.theme.language == "zh" %}创建你的第一个 Actor{% else %}Create Your First Actor{% endif %}

-
from pulsing.actor import as_actor, create_actor_system, SystemConfig
+              
from pulsing.actor import init, shutdown, remote
 
-@as_actor
+@remote
 class Calculator:
     def __init__(self, initial: int = 0):
         self.value = initial
@@ -631,16 +631,16 @@ 

{% if config.theme.language == "zh" %}运行它{% else %}Run It{% endif %}import asyncio async def main(): - system = await create_actor_system(SystemConfig.standalone()) + await init() - calc = await Calculator.local(system, initial=100) + calc = await Calculator.spawn(initial=100) result = await calc.add(50) # 150 result = await calc.add(25) # 175 value = await calc.get() # 175 print(f"Final value: {value}") - await system.shutdown() + await shutdown() asyncio.run(main())

diff --git a/docs/src/api_reference.md b/docs/src/api_reference.md index 6028b6197..3792f875a 100644 --- a/docs/src/api_reference.md +++ b/docs/src/api_reference.md @@ -195,12 +195,14 @@ class ActorRef: ## Decorators -### @as_actor +### @remote Convert a class into an Actor automatically. ```python -@as_actor +from pulsing.actor import init, shutdown, remote + +@remote class MyActor: def __init__(self, value: int): self.value = value @@ -210,12 +212,17 @@ class MyActor: async def process(self, data: str) -> dict: return {"result": data.upper()} + +async def main(): + await init() + actor = await MyActor.spawn(value=10) + print(await actor.get()) # 10 + await shutdown() ``` After decoration, the class provides: -- `local(system, **kwargs) -> ActorRef`: Create actor locally -- `remote(system, **kwargs) -> ActorRef`: Create actor remotely (or locally if single node) +- `spawn(**kwargs) -> ActorRef`: Create actor (uses global system from `init()`) ## Functions diff --git a/docs/src/api_reference.zh.md b/docs/src/api_reference.zh.md index abdb92f9c..c31767b9a 100644 --- a/docs/src/api_reference.zh.md +++ b/docs/src/api_reference.zh.md @@ -195,12 +195,14 @@ class ActorRef: ## 装饰器 -### @as_actor +### @remote 自动将类转换为 Actor。 ```python -@as_actor +from pulsing.actor import init, shutdown, remote + +@remote class MyActor: def __init__(self, value: int): self.value = value @@ -210,12 +212,17 @@ class MyActor: async def process(self, data: str) -> dict: return {"result": data.upper()} + +async def main(): + await init() + actor = await MyActor.spawn(value=10) + print(await actor.get()) # 10 + await shutdown() ``` 装饰后,类提供: -- `local(system, **kwargs) -> ActorRef`: 本地创建 actor -- `remote(system, **kwargs) -> ActorRef`: 远程创建 actor(单节点时回退到本地) +- `spawn(**kwargs) -> ActorRef`: 创建 actor(使用 `init()` 初始化的全局系统) ## 函数 diff --git a/docs/src/design/as-actor-decorator.md b/docs/src/design/as-actor-decorator.md index cbfecc325..4911c0a2f 100644 --- a/docs/src/design/as-actor-decorator.md +++ b/docs/src/design/as-actor-decorator.md @@ -1,8 +1,8 @@ -# @as_actor 装饰器设计文档 +# @remote 装饰器设计文档 ## 概述 -`@as_actor` 是一个便利装饰器,将普通 Python 类自动转换为分布式 Actor。它提供类似 Ray 的编程体验,让用户无需关心底层的消息传递细节。 +`@remote` 是一个便利装饰器,将普通 Python 类自动转换为分布式 Actor。它提供类似 Ray 的编程体验,让用户无需关心底层的消息传递细节。 ## 设计目标 @@ -15,16 +15,16 @@ ```mermaid flowchart TB - subgraph Decorator["@as_actor 装饰器"] - A["@as_actor
class Counter"] --> B["ActorClass(Counter)"] + subgraph Decorator["@remote 装饰器"] + A["@remote
class Counter"] --> B["ActorClass(Counter)"] end - B --> C[".local()"] - B --> D[".remote()"] + B --> C[".spawn()"] + B --> D[".local()"] B --> E[".__call__()"] - C --> F["本地 Actor"] - D --> G["远程 Actor"] + C --> F["Actor (全局系统)"] + D --> G["Actor (指定系统)"] E --> H["普通实例"] F --> I["ActorProxy"] @@ -47,9 +47,9 @@ flowchart TB | 方法 | 说明 | |------|------| -| `.local(system, *args, **kwargs)` | 在本地节点创建 Actor | -| `.remote(system, *args, **kwargs)` | 在远程节点创建 Actor(随机选择) | -| `(*args, **kwargs)` | 直接调用,返回普通实例(非 Actor) | +| `.spawn(**kwargs)` | 使用全局系统创建 Actor(推荐) | +| `.local(system, **kwargs)` | 在指定系统本地节点创建 Actor | +| `(**kwargs)` | 直接调用,返回普通实例(非 Actor) | ### 2. ActorProxy @@ -65,16 +65,16 @@ response = await actor_ref.ask(msg) return response["result"] ``` -### 3. SystemActor +### 3. PythonActorService 每个节点自动创建的系统 Actor,负责处理远程 Actor 创建请求: ```mermaid sequenceDiagram participant A as Node A - participant B as Node B (SystemActor) + participant B as Node B (PythonActorService) - A->>A: Counter.remote(system) + A->>A: Counter.spawn() A->>A: 选择远程节点 B A->>B: CreateActor {class, args} @@ -103,9 +103,9 @@ class _WrappedActor: ### 基本用法 ```python -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Counter: def __init__(self, init_value=0): self.value = init_value @@ -118,45 +118,43 @@ class Counter: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) + await init() # 创建 Actor - counter = await Counter.local(system, init_value=10) + counter = await Counter.spawn(init_value=10) # 调用方法 print(await counter.get()) # 10 print(await counter.increment(5)) # 15 + + await shutdown() ``` -### 远程创建 +### 集群模式 ```python -# 节点 A -system_a = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8001") -) - -# 节点 B(加入集群) -system_b = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8002").with_seeds(["127.0.0.1:8001"]) -) - -# 在节点 B 上执行,Actor 会创建在节点 A -counter = await Counter.remote(system_b, init_value=100) -print(await counter.get()) # 100 (数据在节点 A) +# 节点 A (Seed) +await init(addr="0.0.0.0:8001") + +# 节点 B (加入集群) +await init(addr="0.0.0.0:8002", seeds=["127.0.0.1:8001"]) + +# 创建 Actor (可能在任意节点) +counter = await Counter.spawn(init_value=100) +print(await counter.get()) # 100 ``` ### 异步方法支持 ```python -@as_actor +@remote class AsyncWorker: async def fetch_data(self, url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text() -worker = await AsyncWorker.local(system) +worker = await AsyncWorker.spawn() html = await worker.fetch_data("https://example.com") ``` @@ -164,10 +162,11 @@ html = await worker.fetch_data("https://example.com") ```python # 指定名称,便于其他节点发现 -counter = await Counter.local(system, name="global_counter", init_value=0) +counter = await Counter.spawn(name="global_counter", init_value=0) # 其他地方可以通过名称解析 -ref = await system.resolve_named("global_counter") +from pulsing.actor import get_system +ref = await get_system().resolve_named("global_counter") ``` ### 作为普通类使用 @@ -180,19 +179,18 @@ counter.increment(5) # 同步调用,返回 15 ## 与 Ray 对比 -| 特性 | Ray | Pulsing @as_actor | -|------|-----|-------------------| -| 装饰器 | `@ray.remote` | `@as_actor` | -| 本地创建 | `Counter.remote()` | `Counter.local(system)` | -| 远程创建 | `Counter.options(resources=...).remote()` | `Counter.remote(system)` | +| 特性 | Ray | Pulsing @remote | +|------|-----|-----------------| +| 装饰器 | `@ray.remote` | `@remote` | +| 创建 Actor | `Counter.remote()` | `await Counter.spawn()` | | 方法调用 | `ray.get(counter.increment.remote(5))` | `await counter.increment(5)` | -| 调度策略 | 自动调度 + 资源约束 | 随机选择远程节点 | +| 调度策略 | 自动调度 + 资源约束 | 随机选择节点 | | 依赖 | Ray 集群 | 无外部依赖 | ## 限制 -1. **方法参数必须可 JSON 序列化** - 参数通过 JSON 传输 -2. **返回值必须可 JSON 序列化** - 结果通过 JSON 返回 +1. **方法参数必须可序列化** - 参数通过 pickle 传输 +2. **返回值必须可序列化** - 结果通过 pickle 返回 3. **类必须在所有节点可导入** - 远程创建需要目标节点能 import 该类 4. **不支持属性访问** - 只能调用方法,不能直接访问 `counter.value` @@ -201,7 +199,7 @@ counter.increment(5) # 同步调用,返回 15 ### 1. 方法设计 ```python -@as_actor +@remote class GoodDesign: # ✓ 返回完整状态 def get_state(self): @@ -226,7 +224,7 @@ except RuntimeError as e: ```python # 并行调用多个 Actor -workers = [await Worker.local(system, id=i) for i in range(4)] +workers = [await Worker.spawn(id=i) for i in range(4)] results = await asyncio.gather(*[w.process(data) for w in workers]) ``` @@ -234,7 +232,7 @@ results = await asyncio.gather(*[w.process(data) for w in workers]) ### 类注册表 -`@as_actor` 装饰时,类会被注册到全局表: +`@remote` 装饰时,类会被注册到全局表: ```python _actor_class_registry["__main__.Counter"] = Counter @@ -279,7 +277,7 @@ _actor_class_registry["__main__.Counter"] = Counter ## 未来规划 -- [ ] 支持指定目标节点 `Counter.remote(system, node_id=xxx)` +- [ ] 支持指定目标节点 `Counter.spawn(node_id=xxx)` - [ ] 支持负载均衡策略(轮询、最小负载等) -- [ ] 支持资源约束 `Counter.local(system, num_cpus=2)` -- [ ] 支持 Actor 池 `CounterPool.local(system, size=4)` +- [ ] 支持资源约束 `Counter.spawn(num_cpus=2)` +- [ ] 支持 Actor 池 `CounterPool.spawn(size=4)` diff --git a/docs/src/design/as-actor-decorator.zh.md b/docs/src/design/as-actor-decorator.zh.md index cbfecc325..dbd9cba87 100644 --- a/docs/src/design/as-actor-decorator.zh.md +++ b/docs/src/design/as-actor-decorator.zh.md @@ -1,8 +1,8 @@ -# @as_actor 装饰器设计文档 +# @remote 装饰器设计文档 ## 概述 -`@as_actor` 是一个便利装饰器,将普通 Python 类自动转换为分布式 Actor。它提供类似 Ray 的编程体验,让用户无需关心底层的消息传递细节。 +`@remote` 是一个便利装饰器,将普通 Python 类自动转换为分布式 Actor。它提供类似 Ray 的编程体验,让用户无需关心底层的消息传递细节。 ## 设计目标 @@ -15,16 +15,16 @@ ```mermaid flowchart TB - subgraph Decorator["@as_actor 装饰器"] - A["@as_actor
class Counter"] --> B["ActorClass(Counter)"] + subgraph Decorator["@remote 装饰器"] + A["@remote
class Counter"] --> B["ActorClass(Counter)"] end - B --> C[".local()"] - B --> D[".remote()"] + B --> C[".spawn()"] + B --> D[".local()"] B --> E[".__call__()"] - C --> F["本地 Actor"] - D --> G["远程 Actor"] + C --> F["Actor (全局系统)"] + D --> G["Actor (指定系统)"] E --> H["普通实例"] F --> I["ActorProxy"] @@ -47,9 +47,9 @@ flowchart TB | 方法 | 说明 | |------|------| -| `.local(system, *args, **kwargs)` | 在本地节点创建 Actor | -| `.remote(system, *args, **kwargs)` | 在远程节点创建 Actor(随机选择) | -| `(*args, **kwargs)` | 直接调用,返回普通实例(非 Actor) | +| `.spawn(**kwargs)` | 使用全局系统创建 Actor(推荐) | +| `.local(system, **kwargs)` | 在指定系统本地节点创建 Actor | +| `(**kwargs)` | 直接调用,返回普通实例(非 Actor) | ### 2. ActorProxy @@ -65,16 +65,16 @@ response = await actor_ref.ask(msg) return response["result"] ``` -### 3. SystemActor +### 3. PythonActorService 每个节点自动创建的系统 Actor,负责处理远程 Actor 创建请求: ```mermaid sequenceDiagram - participant A as Node A - participant B as Node B (SystemActor) + participant A as 节点 A + participant B as 节点 B (PythonActorService) - A->>A: Counter.remote(system) + A->>A: Counter.spawn() A->>A: 选择远程节点 B A->>B: CreateActor {class, args} @@ -103,9 +103,9 @@ class _WrappedActor: ### 基本用法 ```python -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Counter: def __init__(self, init_value=0): self.value = init_value @@ -118,45 +118,43 @@ class Counter: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) + await init() # 创建 Actor - counter = await Counter.local(system, init_value=10) + counter = await Counter.spawn(init_value=10) # 调用方法 print(await counter.get()) # 10 print(await counter.increment(5)) # 15 + + await shutdown() ``` -### 远程创建 +### 集群模式 ```python -# 节点 A -system_a = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8001") -) - -# 节点 B(加入集群) -system_b = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8002").with_seeds(["127.0.0.1:8001"]) -) - -# 在节点 B 上执行,Actor 会创建在节点 A -counter = await Counter.remote(system_b, init_value=100) -print(await counter.get()) # 100 (数据在节点 A) +# 节点 A (种子节点) +await init(addr="0.0.0.0:8001") + +# 节点 B (加入集群) +await init(addr="0.0.0.0:8002", seeds=["127.0.0.1:8001"]) + +# 创建 Actor (可能在任意节点) +counter = await Counter.spawn(init_value=100) +print(await counter.get()) # 100 ``` ### 异步方法支持 ```python -@as_actor +@remote class AsyncWorker: async def fetch_data(self, url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text() -worker = await AsyncWorker.local(system) +worker = await AsyncWorker.spawn() html = await worker.fetch_data("https://example.com") ``` @@ -164,10 +162,11 @@ html = await worker.fetch_data("https://example.com") ```python # 指定名称,便于其他节点发现 -counter = await Counter.local(system, name="global_counter", init_value=0) +counter = await Counter.spawn(name="global_counter", init_value=0) # 其他地方可以通过名称解析 -ref = await system.resolve_named("global_counter") +from pulsing.actor import get_system +ref = await get_system().resolve_named("global_counter") ``` ### 作为普通类使用 @@ -180,19 +179,18 @@ counter.increment(5) # 同步调用,返回 15 ## 与 Ray 对比 -| 特性 | Ray | Pulsing @as_actor | -|------|-----|-------------------| -| 装饰器 | `@ray.remote` | `@as_actor` | -| 本地创建 | `Counter.remote()` | `Counter.local(system)` | -| 远程创建 | `Counter.options(resources=...).remote()` | `Counter.remote(system)` | +| 特性 | Ray | Pulsing @remote | +|------|-----|-----------------| +| 装饰器 | `@ray.remote` | `@remote` | +| 创建 Actor | `Counter.remote()` | `await Counter.spawn()` | | 方法调用 | `ray.get(counter.increment.remote(5))` | `await counter.increment(5)` | -| 调度策略 | 自动调度 + 资源约束 | 随机选择远程节点 | +| 调度策略 | 自动调度 + 资源约束 | 随机选择节点 | | 依赖 | Ray 集群 | 无外部依赖 | ## 限制 -1. **方法参数必须可 JSON 序列化** - 参数通过 JSON 传输 -2. **返回值必须可 JSON 序列化** - 结果通过 JSON 返回 +1. **方法参数必须可序列化** - 参数通过 pickle 传输 +2. **返回值必须可序列化** - 结果通过 pickle 返回 3. **类必须在所有节点可导入** - 远程创建需要目标节点能 import 该类 4. **不支持属性访问** - 只能调用方法,不能直接访问 `counter.value` @@ -201,7 +199,7 @@ counter.increment(5) # 同步调用,返回 15 ### 1. 方法设计 ```python -@as_actor +@remote class GoodDesign: # ✓ 返回完整状态 def get_state(self): @@ -219,14 +217,14 @@ try: result = await counter.increment(5) except RuntimeError as e: # 远程方法抛出的异常会被包装为 RuntimeError - print(f"Remote error: {e}") + print(f"远程错误: {e}") ``` ### 3. 批量操作 ```python # 并行调用多个 Actor -workers = [await Worker.local(system, id=i) for i in range(4)] +workers = [await Worker.spawn(id=i) for i in range(4)] results = await asyncio.gather(*[w.process(data) for w in workers]) ``` @@ -234,7 +232,7 @@ results = await asyncio.gather(*[w.process(data) for w in workers]) ### 类注册表 -`@as_actor` 装饰时,类会被注册到全局表: +`@remote` 装饰时,类会被注册到全局表: ```python _actor_class_registry["__main__.Counter"] = Counter @@ -279,7 +277,7 @@ _actor_class_registry["__main__.Counter"] = Counter ## 未来规划 -- [ ] 支持指定目标节点 `Counter.remote(system, node_id=xxx)` +- [ ] 支持指定目标节点 `Counter.spawn(node_id=xxx)` - [ ] 支持负载均衡策略(轮询、最小负载等) -- [ ] 支持资源约束 `Counter.local(system, num_cpus=2)` -- [ ] 支持 Actor 池 `CounterPool.local(system, size=4)` +- [ ] 支持资源约束 `Counter.spawn(num_cpus=2)` +- [ ] 支持 Actor 池 `CounterPool.spawn(size=4)` diff --git a/docs/src/examples/index.md b/docs/src/examples/index.md index acc37c2f0..ce57f0791 100644 --- a/docs/src/examples/index.md +++ b/docs/src/examples/index.md @@ -10,18 +10,18 @@ The simplest possible Pulsing application: ```python import asyncio -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class HelloActor: def greet(self, name: str) -> str: return f"Hello, {name}!" async def main(): - system = await create_actor_system(SystemConfig.standalone()) - hello = await HelloActor.local(system) + await init() + hello = await HelloActor.spawn() print(await hello.greet("World")) - await system.shutdown() + await shutdown() asyncio.run(main()) ``` @@ -31,7 +31,7 @@ asyncio.run(main()) A stateful actor that maintains a counter: ```python -@as_actor +@remote class Counter: def __init__(self, initial: int = 0): self.value = initial @@ -59,7 +59,9 @@ class Counter: Two actors communicating across nodes: ```python -@as_actor +from pulsing.actor import init, shutdown, remote, get_system + +@remote class PingActor: def __init__(self, pong_ref=None): self.pong = pong_ref @@ -72,7 +74,7 @@ class PingActor: print(f"Received: {response}") return self.count -@as_actor +@remote class PongActor: def pong(self, n: int) -> str: return f"pong-{n}" @@ -80,28 +82,18 @@ class PongActor: async def main(): # Node 1: Start pong actor - system1 = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8000") - ) - pong = await PongActor.local(system1) - await system1.register("pong", pong, public=True) - - # Node 2: Start ping actor - system2 = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8001") - .with_seeds(["127.0.0.1:8000"]) - ) - await asyncio.sleep(1.0) # Wait for cluster sync - - pong_ref = await system2.find("pong") - ping = await PingActor.local(system2, pong_ref=pong_ref) - - # Run ping-pong - count = await ping.start_ping(10) - print(f"Completed {count} ping-pong rounds") - - await system1.shutdown() - await system2.shutdown() + await init(addr="0.0.0.0:8000") + pong = await PongActor.spawn() + system = get_system() + await system.register("pong", pong, public=True) + + # Node 2: Would run on another machine + # await init(addr="0.0.0.0:8001", seeds=["node1:8000"]) + # pong_ref = await get_system().find("pong") + # ping = await PingActor.spawn(pong_ref=pong_ref) + # await ping.start_ping(10) + + await shutdown() ``` ### Worker Pool @@ -109,7 +101,7 @@ async def main(): Distributing work across multiple workers: ```python -@as_actor +@remote class Worker: def __init__(self, worker_id: int): self.worker_id = worker_id @@ -132,15 +124,15 @@ class Worker: } -@as_actor +@remote class WorkerPool: def __init__(self): self.workers = [] self.next_worker = 0 - async def initialize(self, system, num_workers: int): + async def initialize(self, num_workers: int): for i in range(num_workers): - worker = await Worker.local(system, worker_id=i) + worker = await Worker.spawn(worker_id=i) self.workers.append(worker) async def submit(self, task: dict) -> dict: @@ -166,7 +158,7 @@ class WorkerPool: ### Simple LLM Service ```python -@as_actor +@remote class LLMService: def __init__(self, model_name: str): self.model_name = model_name @@ -209,7 +201,7 @@ class LLMService: ### Load-Balanced LLM Cluster ```python -@as_actor +@remote class LLMRouter: """Routes requests to LLM workers with load balancing.""" diff --git a/docs/src/examples/index.zh.md b/docs/src/examples/index.zh.md index d670dfcac..7b703e940 100644 --- a/docs/src/examples/index.zh.md +++ b/docs/src/examples/index.zh.md @@ -10,18 +10,18 @@ ```python import asyncio -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class HelloActor: def greet(self, name: str) -> str: return f"Hello, {name}!" async def main(): - system = await create_actor_system(SystemConfig.standalone()) - hello = await HelloActor.local(system) + await init() + hello = await HelloActor.spawn() print(await hello.greet("World")) - await system.shutdown() + await shutdown() asyncio.run(main()) ``` @@ -31,7 +31,7 @@ asyncio.run(main()) 维护计数器的有状态 Actor: ```python -@as_actor +@remote class Counter: def __init__(self, initial: int = 0): self.value = initial @@ -59,7 +59,9 @@ class Counter: 两个 Actor 跨节点通信: ```python -@as_actor +from pulsing.actor import init, shutdown, remote, get_system + +@remote class PingActor: def __init__(self, pong_ref=None): self.pong = pong_ref @@ -72,7 +74,7 @@ class PingActor: print(f"收到: {response}") return self.count -@as_actor +@remote class PongActor: def pong(self, n: int) -> str: return f"pong-{n}" @@ -80,28 +82,18 @@ class PongActor: async def main(): # 节点 1:启动 pong Actor - system1 = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8000") - ) - pong = await PongActor.local(system1) - await system1.register("pong", pong, public=True) - - # 节点 2:启动 ping Actor - system2 = await create_actor_system( - SystemConfig.with_addr("0.0.0.0:8001") - .with_seeds(["127.0.0.1:8000"]) - ) - await asyncio.sleep(1.0) # 等待集群同步 - - pong_ref = await system2.find("pong") - ping = await PingActor.local(system2, pong_ref=pong_ref) - - # 运行 ping-pong - count = await ping.start_ping(10) - print(f"完成 {count} 轮 ping-pong") - - await system1.shutdown() - await system2.shutdown() + await init(addr="0.0.0.0:8000") + pong = await PongActor.spawn() + system = get_system() + await system.register("pong", pong, public=True) + + # 节点 2:在另一台机器上运行 + # await init(addr="0.0.0.0:8001", seeds=["node1:8000"]) + # pong_ref = await get_system().find("pong") + # ping = await PingActor.spawn(pong_ref=pong_ref) + # await ping.start_ping(10) + + await shutdown() ``` ### 工作池 @@ -109,7 +101,7 @@ async def main(): 将工作分配给多个工作器: ```python -@as_actor +@remote class Worker: def __init__(self, worker_id: int): self.worker_id = worker_id @@ -132,15 +124,15 @@ class Worker: } -@as_actor +@remote class WorkerPool: def __init__(self): self.workers = [] self.next_worker = 0 - async def initialize(self, system, num_workers: int): + async def initialize(self, num_workers: int): for i in range(num_workers): - worker = await Worker.local(system, worker_id=i) + worker = await Worker.spawn(worker_id=i) self.workers.append(worker) async def submit(self, task: dict) -> dict: @@ -160,7 +152,7 @@ class WorkerPool: ### 简单 LLM 服务 ```python -@as_actor +@remote class LLMService: def __init__(self, model_name: str): self.model_name = model_name diff --git a/docs/src/guide/actor_system.md b/docs/src/guide/actor_system.md index 0c9ba1d15..97b7897c9 100644 --- a/docs/src/guide/actor_system.md +++ b/docs/src/guide/actor_system.md @@ -31,14 +31,14 @@ async def main(): await system.shutdown() ``` -### Using @as_actor Decorator +### Using @remote Decorator -The `@as_actor` decorator automatically converts a class into an Actor: +The `@remote` decorator automatically converts a class into an Actor: ```python -from pulsing.actor import as_actor, SystemConfig, create_actor_system +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Counter: def __init__(self, value: int = 0): self.value = value @@ -51,10 +51,10 @@ class Counter: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - counter = await Counter.local(system, value=10) + await init() + counter = await Counter.spawn(value=10) print(await counter.get()) # 10 - await system.shutdown() + await shutdown() ``` ## Message Passing @@ -102,7 +102,7 @@ await system.stop("my-actor") Actors encapsulate state. Each actor instance has its own isolated state: ```python -@as_actor +@remote class StatefulActor: def __init__(self): self.counter = 0 diff --git a/docs/src/guide/actor_system.zh.md b/docs/src/guide/actor_system.zh.md index f234a66c1..0b8fc7e43 100644 --- a/docs/src/guide/actor_system.zh.md +++ b/docs/src/guide/actor_system.zh.md @@ -31,14 +31,14 @@ async def main(): await system.shutdown() ``` -### 使用 @as_actor 装饰器 +### 使用 @remote 装饰器 -`@as_actor` 装饰器可以自动将类转换为 Actor: +`@remote` 装饰器可以自动将类转换为 Actor: ```python -from pulsing.actor import as_actor, SystemConfig, create_actor_system +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Counter: def __init__(self, value: int = 0): self.value = value @@ -51,10 +51,10 @@ class Counter: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - counter = await Counter.local(system, value=10) + await init() + counter = await Counter.spawn(value=10) print(await counter.get()) # 10 - await system.shutdown() + await shutdown() ``` ## 消息传递 @@ -102,7 +102,7 @@ await system.stop("my-actor") Actor 封装状态。每个 actor 实例都有自己独立的状态: ```python -@as_actor +@remote class StatefulActor: def __init__(self): self.counter = 0 diff --git a/docs/src/guide/actors.md b/docs/src/guide/actors.md index 1ec4a2b97..4b512b611 100644 --- a/docs/src/guide/actors.md +++ b/docs/src/guide/actors.md @@ -177,14 +177,23 @@ sequenceDiagram ## Creating Actors -### Method 1: Using @as_actor Decorator (Recommended) +Pulsing offers two API styles to suit different needs: -The `@as_actor` decorator is the simplest way to create actors: +| API | Import | Style | Best For | +|-----|--------|-------|----------| +| **Native Async** | `from pulsing.actor import ...` | `async/await` | New projects, maximum performance | +| **Ray-Compatible** | `from pulsing.compat import ray` | Synchronous | Migrating from Ray, quick prototyping | + +--- + +### Method 1: Native Async API with @remote (Recommended) + +The `@remote` decorator with `init()/shutdown()` provides the cleanest async experience: ```python -from pulsing.actor import as_actor, SystemConfig, create_actor_system +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Calculator: """A simple calculator actor.""" @@ -208,31 +217,87 @@ class Calculator: """Get the current value.""" return self.value - def get_history(self) -> list: - """Get operation history.""" - return self.history - async def main(): - system = await create_actor_system(SystemConfig.standalone()) + # Simple initialization + await init() - # Create local actor - calc = await Calculator.local(system, initial_value=100) + # Spawn actor with await + calc = await Calculator.spawn(initial_value=100) - # Call methods + # Call methods with await - clean and intuitive result = await calc.add(50) # 150 result = await calc.subtract(30) # 120 value = await calc.get_value() # 120 - await system.shutdown() + await shutdown() ``` -**Benefits of @as_actor:** +**Benefits of Native API:** -- No boilerplate code -- Methods become endpoints automatically -- Type hints preserved +- Clean `async/await` syntax +- Maximum performance (no sync wrapper overhead) - IDE autocompletion works +- Type hints preserved + +--- + +### Method 2: Ray-Compatible API (Easy Migration) + +For users migrating from Ray, Pulsing provides a drop-in compatible API: + +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class Calculator: + """A simple calculator actor.""" + + def __init__(self, initial_value: int = 0): + self.value = initial_value + + def add(self, n: int) -> int: + self.value += n + return self.value + + def get_value(self) -> int: + return self.value + + +# Create actor instance +calc = Calculator.remote(initial_value=100) + +# Call methods with .remote() and ray.get() +result = ray.get(calc.add.remote(50)) # 150 +value = ray.get(calc.get_value.remote()) # 150 + +ray.shutdown() +``` + +**Migration from Ray:** + +```python +# Before (Ray): +import ray +ray.init() + +# After (Pulsing - just change import!): +from pulsing.compat import ray +ray.init() +``` + +**Benefits of Ray-Compatible API:** + +- One-line migration from Ray +- Familiar synchronous interface +- Same `ray.get()`, `ray.put()`, `ray.wait()` semantics +- Great for existing Ray codebases + +--- + +### Method 3: Using Actor Base Class (Advanced) ### Method 2: Using Actor Base Class @@ -277,7 +342,7 @@ async def main(): Actors can have async methods for I/O operations: ```python -@as_actor +@remote class AsyncWorker: """An actor with async methods.""" @@ -331,7 +396,7 @@ graph TD Send a message and wait for response: ```python -# Using @as_actor +# Using @remote result = await calc.add(10) # Using Actor base class @@ -377,7 +442,7 @@ For continuous data flow: from pulsing.actor import StreamMessage # Actor that returns a streaming response -@as_actor +@remote class TokenGenerator: async def generate(self, prompt: str) -> Message: stream_msg, writer = StreamMessage.create("tokens") @@ -559,7 +624,7 @@ await system.spawn(HelperActor(), "internal-helper", public=False) Most common pattern for actor communication: ```python -@as_actor +@remote class RequestHandler: async def handle_request(self, request: dict) -> dict: # Validate @@ -578,7 +643,7 @@ class RequestHandler: Actors maintain state between calls: ```python -@as_actor +@remote class SessionManager: def __init__(self): self.sessions = {} @@ -614,7 +679,7 @@ class SessionManager: Distribute work across multiple actors: ```python -@as_actor +@remote class WorkerPool: def __init__(self, num_workers: int): self.workers = [] @@ -645,7 +710,7 @@ class WorkerPool: Chain actors for data processing: ```python -@as_actor +@remote class PipelineStage: def __init__(self, next_stage=None): self.next_stage = next_stage @@ -676,7 +741,7 @@ result = await stage1.process(input_data) ### 5. LLM Inference Service Pattern ```python -@as_actor +@remote class LLMService: """Actor for LLM inference.""" @@ -728,7 +793,7 @@ class LLMService: ```python # ✅ Good: All state in __init__ -@as_actor +@remote class GoodActor: def __init__(self): self.counter = 0 @@ -743,7 +808,7 @@ class GoodActor: # ❌ Bad: Global state global_state = {} -@as_actor +@remote class BadActor: def update(self, key, value): global_state[key] = value # Race conditions! @@ -752,7 +817,7 @@ class BadActor: ### 3. Error Handling ```python -@as_actor +@remote class ResilientActor: async def risky_operation(self, data: dict) -> dict: try: @@ -770,7 +835,7 @@ class ResilientActor: ### 4. Performance Tips ```python -@as_actor +@remote class OptimizedActor: def __init__(self): # ✅ Pre-allocate resources @@ -819,12 +884,12 @@ async def test_calculator(): ## Quick Reference -### Basic Actor with @as_actor +### Native Async API (Recommended) ```python -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class MyActor: def __init__(self, param: int): self.param = param @@ -833,10 +898,30 @@ class MyActor: return self.param + arg async def main(): - system = await create_actor_system(SystemConfig.standalone()) - actor = await MyActor.local(system, param=10) + await init() + actor = await MyActor.spawn(param=10) result = await actor.method(5) # 15 - await system.shutdown() + await shutdown() +``` + +### Ray-Compatible API + +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class MyActor: + def __init__(self, param: int): + self.param = param + + def method(self, arg: int) -> int: + return self.param + arg + +actor = MyActor.remote(param=10) +result = ray.get(actor.method.remote(5)) # 15 +ray.shutdown() ``` ### Cluster Setup @@ -893,10 +978,11 @@ await system.shutdown() 1. **Actors are Isolated**: Private state, message-based communication 2. **Sequential Processing**: One message at a time, FIFO ordering -3. **@as_actor Decorator**: Simplest way to create actors +3. **Two API Styles**: Native async (`@remote`) or Ray-compatible (`@ray.remote`) 4. **Location Transparent**: Same code for local/remote actors 5. **Zero External Dependencies**: No etcd, NATS, or Consul needed 6. **Built-in Clustering**: SWIM protocol for discovery +7. **Easy Migration**: One-line import change from Ray ### Actor Lifecycle Recap diff --git a/docs/src/guide/actors.zh.md b/docs/src/guide/actors.zh.md index 4300505b6..9ddf8598d 100644 --- a/docs/src/guide/actors.zh.md +++ b/docs/src/guide/actors.zh.md @@ -169,14 +169,23 @@ sequenceDiagram ## 创建 Actor -### 方法 1:使用 @as_actor 装饰器(推荐) +Pulsing 提供两种 API 风格以满足不同需求: -`@as_actor` 装饰器是创建 Actor 的最简单方式: +| API | 导入方式 | 风格 | 适用场景 | +|-----|---------|------|----------| +| **原生异步** | `from pulsing.actor import ...` | `async/await` | 新项目,追求极致性能 | +| **Ray 兼容** | `from pulsing.compat import ray` | 同步调用 | 从 Ray 迁移,快速原型 | + +--- + +### 方法 1:原生异步 API 使用 @remote(推荐) + +`@remote` 装饰器配合 `init()/shutdown()` 提供最简洁的异步体验: ```python -from pulsing.actor import as_actor, SystemConfig, create_actor_system +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Calculator: """一个简单的计算器 Actor。""" @@ -202,25 +211,85 @@ class Calculator: async def main(): - system = await create_actor_system(SystemConfig.standalone()) + # 简洁的初始化 + await init() - # 创建本地 Actor - calc = await Calculator.local(system, initial_value=100) + # 使用 await 创建 actor + calc = await Calculator.spawn(initial_value=100) - # 调用方法 + # await 调用方法 - 简洁直观 result = await calc.add(50) # 150 result = await calc.subtract(30) # 120 value = await calc.get_value() # 120 - await system.shutdown() + await shutdown() ``` -**@as_actor 的优点:** +**原生 API 的优点:** -- 无样板代码 -- 方法自动成为端点 -- 保留类型提示 +- 简洁的 `async/await` 语法 +- 最佳性能(无同步包装开销) - IDE 自动补全正常工作 +- 保留类型提示 + +--- + +### 方法 2:Ray 兼容 API(轻松迁移) + +对于从 Ray 迁移的用户,Pulsing 提供兼容的 API: + +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class Calculator: + """一个简单的计算器 Actor。""" + + def __init__(self, initial_value: int = 0): + self.value = initial_value + + def add(self, n: int) -> int: + self.value += n + return self.value + + def get_value(self) -> int: + return self.value + + +# 创建 actor 实例 +calc = Calculator.remote(initial_value=100) + +# 使用 .remote() 和 ray.get() 调用方法 +result = ray.get(calc.add.remote(50)) # 150 +value = ray.get(calc.get_value.remote()) # 150 + +ray.shutdown() +``` + +**从 Ray 迁移:** + +```python +# 之前 (Ray): +import ray +ray.init() + +# 之后 (Pulsing - 只需改一行导入!): +from pulsing.compat import ray +ray.init() +``` + +**Ray 兼容 API 的优点:** + +- 一行代码完成 Ray 迁移 +- 熟悉的同步接口 +- 相同的 `ray.get()`、`ray.put()`、`ray.wait()` 语义 +- 适合现有 Ray 代码库 + +--- + +### 方法 3:使用 Actor 基类(高级) ### 方法 2:使用 Actor 基类 @@ -265,7 +334,7 @@ async def main(): Actor 可以有用于 I/O 操作的异步方法: ```python -@as_actor +@remote class AsyncWorker: """具有异步方法的 Actor。""" @@ -319,7 +388,7 @@ graph TD 发送消息并等待响应: ```python -# 使用 @as_actor +# 使用 @remote result = await calc.add(10) # 使用 Actor 基类 @@ -346,7 +415,7 @@ do_other_work() from pulsing.actor import StreamMessage # 返回流式响应的 Actor -@as_actor +@remote class TokenGenerator: async def generate(self, prompt: str) -> Message: stream_msg, writer = StreamMessage.create("tokens") @@ -445,7 +514,7 @@ await system.spawn(HelperActor(), "internal-helper", public=False) ### 1. LLM 推理服务模式 ```python -@as_actor +@remote class LLMService: """用于 LLM 推理的 Actor。""" @@ -470,7 +539,7 @@ class LLMService: ### 2. 工作池模式 ```python -@as_actor +@remote class WorkerPool: def __init__(self, num_workers: int): self.workers = [] @@ -506,7 +575,7 @@ class WorkerPool: ### 2. 错误处理 ```python -@as_actor +@remote class ResilientActor: async def risky_operation(self, data: dict) -> dict: try: @@ -525,12 +594,12 @@ class ResilientActor: ## 快速参考 -### 基本 Actor +### 原生异步 API(推荐) ```python -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class MyActor: def __init__(self, param: int): self.param = param @@ -539,10 +608,30 @@ class MyActor: return self.param + arg async def main(): - system = await create_actor_system(SystemConfig.standalone()) - actor = await MyActor.local(system, param=10) + await init() + actor = await MyActor.spawn(param=10) result = await actor.method(5) # 15 - await system.shutdown() + await shutdown() +``` + +### Ray 兼容 API + +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class MyActor: + def __init__(self, param: int): + self.param = param + + def method(self, arg: int) -> int: + return self.param + arg + +actor = MyActor.remote(param=10) +result = ray.get(actor.method.remote(5)) # 15 +ray.shutdown() ``` ### 集群设置 @@ -589,10 +678,11 @@ await system.shutdown() 1. **Actor 是隔离的**:私有状态,基于消息的通信 2. **顺序处理**:一次处理一条消息,FIFO 顺序 -3. **@as_actor 装饰器**:创建 Actor 的最简单方式 +3. **双 API 风格**:原生异步(`@remote`)或 Ray 兼容(`@ray.remote`) 4. **位置透明**:本地/远程 Actor 使用相同代码 5. **零外部依赖**:不需要 etcd、NATS 或 Consul 6. **内置集群**:用于发现的 SWIM 协议 +7. **轻松迁移**:一行导入即可从 Ray 迁移 ### 下一步 diff --git a/docs/src/guide/remote_actors.md b/docs/src/guide/remote_actors.md index ae4661626..84c0ed011 100644 --- a/docs/src/guide/remote_actors.md +++ b/docs/src/guide/remote_actors.md @@ -111,7 +111,7 @@ except Exception as e: ## Example: Distributed Counter ```python -@as_actor +@remote class DistributedCounter: def __init__(self, init_value: int = 0): self.value = init_value diff --git a/docs/src/guide/remote_actors.zh.md b/docs/src/guide/remote_actors.zh.md index 2e0a4771b..8474cc872 100644 --- a/docs/src/guide/remote_actors.zh.md +++ b/docs/src/guide/remote_actors.zh.md @@ -111,7 +111,7 @@ except Exception as e: ## 示例:分布式计数器 ```python -@as_actor +@remote class DistributedCounter: def __init__(self, init_value: int = 0): self.value = init_value diff --git a/docs/src/guide/security.md b/docs/src/guide/security.md index cdc9544b5..9b8804b1d 100644 --- a/docs/src/guide/security.md +++ b/docs/src/guide/security.md @@ -174,12 +174,12 @@ Even with TLS, use network-level security: ```python import os -from pulsing.actor import SystemConfig, create_actor_system, as_actor +from pulsing.actor import init, shutdown, remote # Get passphrase from environment PASSPHRASE = os.environ.get("PULSING_SECRET", None) -@as_actor +@remote class SecureCounter: def __init__(self, init_value: int = 0): self.value = init_value diff --git a/docs/src/guide/security.zh.md b/docs/src/guide/security.zh.md index 46320a401..a3d5c27a5 100644 --- a/docs/src/guide/security.zh.md +++ b/docs/src/guide/security.zh.md @@ -174,12 +174,12 @@ else: ```python import os -from pulsing.actor import SystemConfig, create_actor_system, as_actor +from pulsing.actor import init, shutdown, remote # 从环境变量获取口令 PASSPHRASE = os.environ.get("PULSING_SECRET", None) -@as_actor +@remote class SecureCounter: def __init__(self, init_value: int = 0): self.value = init_value diff --git a/docs/src/index.md b/docs/src/index.md index 29b9721e8..d263ea4e2 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -17,8 +17,9 @@ hide: toc - **SWIM Protocol Discovery** - Built-in gossip-based node discovery and failure detection. - **Location Transparent** - ActorRef supports unified access to local and remote actors. - **Streaming Messages** - Native support for streaming requests and responses. -- **Python First** - Full Python API via PyO3 with `@as_actor` decorator. +- **Python First** - Full Python API via PyO3 with `@remote` decorator. - **High Performance** - Built on Tokio async runtime with HTTP/2 transport. +- **Ray Compatible** - Drop-in replacement API for easy migration from Ray. ## Quick Start @@ -29,9 +30,9 @@ maturin develop ``` ```python -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Calculator: def __init__(self, initial: int = 0): self.value = initial @@ -41,9 +42,10 @@ class Calculator: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - calc = await Calculator.local(system, initial=100) + await init() + calc = await Calculator.spawn(initial=100) result = await calc.add(50) # 150 + await shutdown() ``` ## Use Cases diff --git a/docs/src/index.zh.md b/docs/src/index.zh.md index d4f92c048..0d35ecebf 100644 --- a/docs/src/index.zh.md +++ b/docs/src/index.zh.md @@ -17,8 +17,9 @@ hide: toc - **SWIM 协议发现** - 内置基于 Gossip 的节点发现和故障检测。 - **位置透明** - ActorRef 支持统一访问本地和远程 Actor。 - **流式消息** - 原生支持流式请求和响应。 -- **Python 优先** - 通过 PyO3 提供完整的 Python API,支持 `@as_actor` 装饰器。 +- **Python 优先** - 通过 PyO3 提供完整的 Python API,支持 `@remote` 装饰器。 - **高性能** - 基于 Tokio 异步运行时,使用 HTTP/2 传输。 +- **Ray 兼容** - 提供兼容 API,轻松从 Ray 迁移。 ## 快速开始 @@ -29,9 +30,9 @@ maturin develop ``` ```python -from pulsing.actor import as_actor, create_actor_system, SystemConfig +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Calculator: def __init__(self, initial: int = 0): self.value = initial @@ -41,9 +42,10 @@ class Calculator: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - calc = await Calculator.local(system, initial=100) + await init() + calc = await Calculator.spawn(initial=100) result = await calc.add(50) # 150 + await shutdown() ``` ## 使用场景 diff --git a/docs/src/quickstart/index.md b/docs/src/quickstart/index.md index 648ea6a4b..2c1ec4533 100644 --- a/docs/src/quickstart/index.md +++ b/docs/src/quickstart/index.md @@ -51,26 +51,52 @@ graph LR ## Your First Actor (30 seconds) +### Option 1: Native Async API (Recommended) + ```python import asyncio -from pulsing.actor import Actor, SystemConfig, create_actor_system +from pulsing.actor import init, shutdown, remote -class PingPong(Actor): - async def receive(self, msg): - if msg == "ping": - return "pong" - return f"echo: {msg}" +@remote +class Counter: + def __init__(self, value=0): + self.value = value + + def inc(self): + self.value += 1 + return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - actor = await system.spawn("pingpong", PingPong()) + await init() + counter = await Counter.spawn(value=0) + print(await counter.inc()) # 1 + print(await counter.inc()) # 2 + await shutdown() - print(await actor.ask("ping")) # -> pong - print(await actor.ask("hello")) # -> echo: hello +asyncio.run(main()) +``` - await system.shutdown() +### Option 2: Ray-Compatible API (Easy Migration) -asyncio.run(main()) +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class Counter: + def __init__(self, value=0): + self.value = value + + def inc(self): + self.value += 1 + return self.value + +counter = Counter.remote(value=0) +print(ray.get(counter.inc.remote())) # 1 +print(ray.get(counter.inc.remote())) # 2 + +ray.shutdown() ``` **Any Python object** can be a message—strings, dicts, lists, or custom classes. @@ -94,14 +120,19 @@ class Counter(Actor): --- -## @as_actor Decorator +## API Comparison + +| API | Style | Best For | +|-----|-------|----------| +| `pulsing.actor` | Async (`await`) | New projects, performance | +| `pulsing.compat.ray` | Sync (Ray-style) | Ray migration, quick start | -For a more object-oriented API: +### @remote Decorator (Native API) ```python -from pulsing.actor import as_actor +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Counter: def __init__(self, initial=0): self.value = initial @@ -111,9 +142,10 @@ class Counter: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - counter = await Counter.local(system, initial=10) + await init() + counter = await Counter.spawn(initial=10) print(await counter.inc(5)) # 15 + await shutdown() ``` --- @@ -146,8 +178,8 @@ result = await worker.ask("do_work") # Same API! |---------|-------------| | **Actor** | Isolated unit with private state | | **Message** | Any Python object | -| **ask/tell** | Request-response / Fire-and-forget | -| **@as_actor** | Method-call style Actor | +| **@remote** | Native async decorator (via `pulsing.actor`) | +| **ray.remote** | Ray-compatible decorator (via `pulsing.compat.ray`) | | **Cluster** | SWIM protocol auto-discovery | --- diff --git a/docs/src/quickstart/index.zh.md b/docs/src/quickstart/index.zh.md index c7f577844..87ae5e933 100644 --- a/docs/src/quickstart/index.zh.md +++ b/docs/src/quickstart/index.zh.md @@ -51,26 +51,52 @@ graph LR ## 第一个 Actor(30秒) +### 方式一:原生异步 API(推荐) + ```python import asyncio -from pulsing.actor import Actor, SystemConfig, create_actor_system +from pulsing.actor import init, shutdown, remote -class PingPong(Actor): - async def receive(self, msg): - if msg == "ping": - return "pong" - return f"echo: {msg}" +@remote +class Counter: + def __init__(self, value=0): + self.value = value + + def inc(self): + self.value += 1 + return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - actor = await system.spawn("pingpong", PingPong()) + await init() + counter = await Counter.spawn(value=0) + print(await counter.inc()) # 1 + print(await counter.inc()) # 2 + await shutdown() - print(await actor.ask("ping")) # -> pong - print(await actor.ask("hello")) # -> echo: hello +asyncio.run(main()) +``` - await system.shutdown() +### 方式二:Ray 兼容 API(轻松迁移) -asyncio.run(main()) +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class Counter: + def __init__(self, value=0): + self.value = value + + def inc(self): + self.value += 1 + return self.value + +counter = Counter.remote(value=0) +print(ray.get(counter.inc.remote())) # 1 +print(ray.get(counter.inc.remote())) # 2 + +ray.shutdown() ``` **任意 Python 对象**都可以作为消息——字符串、字典、列表或自定义类。 @@ -94,14 +120,19 @@ class Counter(Actor): --- -## @as_actor 装饰器 +## API 对比 + +| API | 风格 | 适用场景 | +|-----|------|----------| +| `pulsing.actor` | 异步 (`await`) | 新项目,高性能 | +| `pulsing.compat.ray` | 同步 (Ray 风格) | Ray 迁移,快速上手 | -更面向对象的 API: +### @remote 装饰器(原生 API) ```python -from pulsing.actor import as_actor +from pulsing.actor import init, shutdown, remote -@as_actor +@remote class Counter: def __init__(self, initial=0): self.value = initial @@ -111,9 +142,10 @@ class Counter: return self.value async def main(): - system = await create_actor_system(SystemConfig.standalone()) - counter = await Counter.local(system, initial=10) + await init() + counter = await Counter.spawn(initial=10) print(await counter.inc(5)) # 15 + await shutdown() ``` --- @@ -146,8 +178,8 @@ result = await worker.ask("do_work") # API 完全相同! |------|------| | **Actor** | 具有私有状态的隔离单元 | | **消息** | 任意 Python 对象 | -| **ask/tell** | 请求-响应 / 发后即忘 | -| **@as_actor** | 方法调用风格的 Actor | +| **@remote** | 原生异步装饰器 (via `pulsing.actor`) | +| **ray.remote** | Ray 兼容装饰器 (via `pulsing.compat.ray`) | | **集群** | SWIM 协议自动发现 | --- diff --git a/examples/python/README.md b/examples/python/README.md index 5b68c49cf..ebd599962 100644 --- a/examples/python/README.md +++ b/examples/python/README.md @@ -8,9 +8,79 @@ pip install -e . ## Run +### 原生异步 API (`pulsing.actor`) + +Pulsing 原生 API,简洁高效: + +```bash +# @remote 装饰器 + await 模式 +python examples/python/remote_actor_example.py + +# 原生异步 API 详细示例 +python examples/python/native_async_example.py +``` + +### Ray 兼容 API (`pulsing.compat.ray`) + +一行代码从 Ray 迁移到 Pulsing: + +```bash +# Ray 风格 API,同步接口 +python examples/python/ray_compat_example.py +``` + +### 基础示例 + ```bash python examples/python/ping_pong.py # Basic communication python examples/python/message_patterns.py # RPC and streaming python examples/python/named_actors.py # Service discovery python examples/python/cluster.py # Multi-node (see --help) ``` + +## API 选择 + +| API | 风格 | 适用场景 | +|-----|------|----------| +| `pulsing.actor` | 异步 (`async/await`) | 新项目,高性能需求 | +| `pulsing.compat.ray` | 同步 (Ray 风格) | Ray 迁移,快速上手 | + +### 原生 API 示例 + +```python +from pulsing.actor import init, shutdown, remote + +@remote +class Counter: + def __init__(self, value=0): + self.value = value + def inc(self): + self.value += 1 + return self.value + +async def main(): + await init() + counter = await Counter.spawn(value=0) + print(await counter.inc()) # 1 + await shutdown() +``` + +### Ray 兼容 API 示例 + +```python +from pulsing.compat import ray + +ray.init() + +@ray.remote +class Counter: + def __init__(self, value=0): + self.value = value + def inc(self): + self.value += 1 + return self.value + +counter = Counter.remote(value=0) +print(ray.get(counter.inc.remote())) # 1 +ray.shutdown() +``` diff --git a/examples/python/native_async_example.py b/examples/python/native_async_example.py new file mode 100644 index 000000000..4d0ad3f6d --- /dev/null +++ b/examples/python/native_async_example.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Pulsing 原生异步 API 示例(推荐) + +展示 pulsing.actor 的简洁异步 API。 + +用法: python examples/python/native_async_example.py +""" + +import asyncio + +# Pulsing 原生 API +from pulsing.actor import init, shutdown, remote + + +@remote +class Counter: + """分布式计数器""" + + def __init__(self, init_value: int = 0): + self.value = init_value + + def get(self) -> int: + return self.value + + def increment(self, n: int = 1) -> int: + self.value += n + return self.value + + +@remote +class Calculator: + """分布式计算器""" + + def add(self, a: int, b: int) -> int: + return a + b + + def multiply(self, a: int, b: int) -> int: + return a * b + + +@remote +class AsyncWorker: + """异步 Worker""" + + def __init__(self, worker_id: str): + self.worker_id = worker_id + self.count = 0 + + async def process(self, data: str) -> dict: + await asyncio.sleep(0.01) # 模拟处理 + self.count += 1 + return { + "worker": self.worker_id, + "input": data, + "output": data.upper(), + "processed": self.count, + } + + +async def main(): + print("=" * 60) + print("Pulsing 原生异步 API 示例") + print("=" * 60) + + # 初始化(简洁!) + await init() + print("✓ Pulsing 已初始化") + + # --- Counter --- + print("\n--- Counter ---") + counter = await Counter.spawn(init_value=10) + + # 直接 await,无需 .remote() + get() + print(f"初始值: {await counter.get()}") + print(f"increment(5): {await counter.increment(5)}") + print(f"最终值: {await counter.get()}") + + # --- Calculator --- + print("\n--- Calculator ---") + calc = await Calculator.spawn() + + print(f"add(10, 20): {await calc.add(10, 20)}") + print(f"multiply(5, 6): {await calc.multiply(5, 6)}") + + # --- 并行调用 --- + print("\n--- 并行调用 ---") + results = await asyncio.gather( + calc.add(1, 2), + calc.add(3, 4), + calc.multiply(5, 6), + ) + print(f"并行结果: {results}") + + # --- AsyncWorker --- + print("\n--- 异步 Worker ---") + worker = await AsyncWorker.spawn(worker_id="worker-001") + result = await worker.process("hello pulsing") + print(f"处理结果: {result}") + + # --- 关闭 --- + await shutdown() + print("\n✓ 完成!") + + +if __name__ == "__main__": + asyncio.run(main()) + + +# ============================================================================= +# API 对比 +# ============================================================================= +# +# | 操作 | Pulsing 原生 (async) | Ray 兼容层 (sync) | +# |----------------|-----------------------------|-----------------------------| +# | 初始化 | await init() | ray.init() | +# | 装饰器 | @remote | @ray.remote | +# | 创建 actor | await Counter.spawn() | Counter.remote() | +# | 调用方法 | await counter.incr() | counter.incr.remote() | +# | 获取结果 | 直接返回 | ray.get(ref) | +# | 关闭 | await shutdown() | ray.shutdown() | +# +# 推荐使用原生 API: +# - 更 Pythonic(标准 async/await) +# - 无需 .remote() + get() 样板代码 +# - 更好的性能(无同步包装开销) +# diff --git a/examples/python/ray_compat_example.py b/examples/python/ray_compat_example.py new file mode 100644 index 000000000..d5c67b0a0 --- /dev/null +++ b/examples/python/ray_compat_example.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +Ray 兼容层示例(迁移用) + +展示如何使用 pulsing.compat.ray 从 Ray 迁移到 Pulsing。 +迁移只需修改一行 import! + +用法: python examples/python/ray_compat_example.py +""" + +# ============================================ +# 从 Ray 迁移:只需改这一行! +# ============================================ +# Before: import ray +# After: +from pulsing.compat import ray + + +@ray.remote +class Counter: + """分布式计数器 (Ray 风格)""" + + def __init__(self, init_value: int = 0): + self.value = init_value + + def get(self) -> int: + return self.value + + def increment(self, n: int = 1) -> int: + self.value += n + return self.value + + +@ray.remote +class Calculator: + """分布式计算器 (Ray 风格)""" + + def add(self, a: int, b: int) -> int: + return a + b + + def multiply(self, a: int, b: int) -> int: + return a * b + + +def main(): + print("=" * 60) + print("Ray 兼容层示例 (from pulsing.compat import ray)") + print("=" * 60) + + # 初始化 (Ray 风格) + ray.init() + print("✓ Pulsing (Ray compat) 已初始化") + + # --- Counter --- + print("\n--- Counter ---") + counter = Counter.remote(init_value=10) + + # Ray 风格调用 + print(f"初始值: {ray.get(counter.get.remote())}") + print(f"increment(5): {ray.get(counter.increment.remote(5))}") + print(f"最终值: {ray.get(counter.get.remote())}") + + # --- Calculator --- + print("\n--- Calculator ---") + calc = Calculator.remote() + + print(f"add(10, 20): {ray.get(calc.add.remote(10, 20))}") + print(f"multiply(5, 6): {ray.get(calc.multiply.remote(5, 6))}") + + # --- 批量获取 --- + print("\n--- 批量获取 ---") + refs = [ + calc.add.remote(1, 2), + calc.add.remote(3, 4), + calc.multiply.remote(5, 6), + ] + results = ray.get(refs) + print(f"批量结果: {results}") + + # --- Object Store --- + print("\n--- put/get ---") + ref = ray.put({"message": "Hello from pulsing.compat.ray!"}) + print(f"结果: {ray.get(ref)}") + + # 关闭 (Ray 风格) + ray.shutdown() + print("\n✓ 完成!") + + +if __name__ == "__main__": + main() + + +# ============================================================================= +# 迁移指南 +# ============================================================================= +# +# Step 1: 修改 import +# ------------------- +# Before: +# import ray +# +# After: +# from pulsing.compat import ray +# +# Step 2: 其余代码完全不变! +# ------------------------- +# ray.init() +# @ray.remote +# Counter.remote() +# counter.incr.remote() +# ray.get(ref) +# ray.shutdown() +# +# ============================================================================= +# 下一步:迁移到原生 API(可选,性能更好) +# ============================================================================= +# +# from pulsing.actor import init, shutdown, remote +# +# await init() +# +# @remote +# class Counter: +# ... +# +# counter = await Counter.spawn() +# result = await counter.incr() # 无需 .remote() + get()! +# +# await shutdown() +# diff --git a/examples/python/remote_actor_example.py b/examples/python/remote_actor_example.py index 1ecea91ce..0f797182d 100644 --- a/examples/python/remote_actor_example.py +++ b/examples/python/remote_actor_example.py @@ -1,6 +1,12 @@ #!/usr/bin/env python3 """ -@as_actor 装饰器示例 +@remote 装饰器示例 (原生异步 API) + +展示 pulsing.actor 的简洁 API: +- await init() 初始化 +- @remote 装饰器 +- await Counter.spawn() 创建 actor +- await counter.method() 调用方法 用法: python examples/python/remote_actor_example.py """ @@ -8,13 +14,13 @@ import asyncio import logging -from pulsing.actor import SystemConfig, as_actor, create_actor_system +from pulsing.actor import init, shutdown, remote logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -@as_actor +@remote class Counter: """分布式计数器""" @@ -33,7 +39,7 @@ def decrement(self, n: int = 1) -> int: return self.value -@as_actor +@remote class KeyValueStore: """分布式键值存储""" @@ -50,7 +56,7 @@ def keys(self) -> list: return list(self.store.keys()) -@as_actor +@remote class AsyncWorker: """支持异步方法""" @@ -69,63 +75,57 @@ def status(self) -> dict: async def main(): print("=" * 60) - print("@remote 装饰器示例") + print("@remote 装饰器示例 (原生异步 API)") print("=" * 60) - system = await create_actor_system(SystemConfig.standalone()) - - try: - # --- Counter --- - print("\n--- Counter (本地创建) ---") - counter = await Counter.local(system, init_value=10) + # 简洁的初始化 + await init() - print(f"初始值: {await counter.get()}") - print(f"increment(5): {await counter.increment(5)}") - print(f"decrement(3): {await counter.decrement(3)}") - print(f"最终值: {await counter.get()}") + # --- Counter --- + print("\n--- Counter ---") + counter = await Counter.spawn(init_value=10) - # --- KeyValueStore --- - print("\n--- KeyValueStore ---") - kv = await KeyValueStore.local(system) + # 直接 await,无需 .remote() + get() + print(f"初始值: {await counter.get()}") + print(f"increment(5): {await counter.increment(5)}") + print(f"decrement(3): {await counter.decrement(3)}") + print(f"最终值: {await counter.get()}") - await kv.put("name", "Pulsing") - await kv.put("version", "0.1.0") + # --- KeyValueStore --- + print("\n--- KeyValueStore ---") + kv = await KeyValueStore.spawn() - print(f"name: {await kv.get('name')}") - print(f"version: {await kv.get('version')}") - print(f"keys: {await kv.keys()}") + await kv.put("name", "Pulsing") + await kv.put("version", "0.7.0") - # --- AsyncWorker --- - print("\n--- AsyncWorker ---") - worker = await AsyncWorker.local(system, worker_id="worker-001") + print(f"name: {await kv.get('name')}") + print(f"version: {await kv.get('version')}") + print(f"keys: {await kv.keys()}") - result = await worker.process("hello") - print(f"处理结果: {result}") + # --- AsyncWorker --- + print("\n--- AsyncWorker ---") + worker = await AsyncWorker.spawn(worker_id="worker-001") - status = await worker.status() - print(f"状态: {status}") + result = await worker.process("hello") + print(f"处理结果: {result}") - # --- 并行调用 --- - print("\n--- 并行调用 ---") - workers = [ - await AsyncWorker.local(system, worker_id=f"worker-{i}") for i in range(3) - ] + status = await worker.status() + print(f"状态: {status}") - tasks = [w.process(f"task-{i}") for i, w in enumerate(workers)] - results = await asyncio.gather(*tasks) + # --- 并行调用 --- + print("\n--- 并行调用 ---") + workers = [await AsyncWorker.spawn(worker_id=f"worker-{i}") for i in range(3)] - for r in results: - print(f" {r['worker']}: {r['input']} -> {r['output']}") + tasks = [w.process(f"task-{i}") for i, w in enumerate(workers)] + results = await asyncio.gather(*tasks) - # --- 远程创建(单节点会 fallback 到本地)--- - print("\n--- 远程创建(单节点 fallback)---") - remote_counter = await Counter.remote(system, init_value=100) - print(f"远程 Counter: {await remote_counter.get()}") + for r in results: + print(f" {r['worker']}: {r['input']} -> {r['output']}") - print("\n✓ 完成!") + print("\n✓ 完成!") - finally: - await system.shutdown() + # 关闭 + await shutdown() if __name__ == "__main__": diff --git a/python/pulsing/__init__.py b/python/pulsing/__init__.py index 38ed0671a..6ea933b06 100644 --- a/python/pulsing/__init__.py +++ b/python/pulsing/__init__.py @@ -1,8 +1,41 @@ """ Pulsing - Distributed Actor Framework +Two API styles: + +1. Native async API (recommended): + from pulsing.actor import init, shutdown, remote + + await init() + + @remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = await Counter.spawn(init=10) + result = await counter.incr() + + await shutdown() + +2. Ray-compatible sync API (for migration): + from pulsing.compat import ray + + ray.init() + + @ray.remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = Counter.remote(init=10) + result = ray.get(counter.incr.remote()) + + ray.shutdown() + Submodules: -- pulsing.actor: Actor system core (ActorSystem, Actor, Message, etc.) +- pulsing.actor: Native async API (recommended) +- pulsing.compat.ray: Ray-compatible sync API (for migration) """ __version__ = "0.7.0" diff --git a/python/pulsing/actor/__init__.py b/python/pulsing/actor/__init__.py index 1e8fe33cd..609fef05a 100644 --- a/python/pulsing/actor/__init__.py +++ b/python/pulsing/actor/__init__.py @@ -1,11 +1,23 @@ """ Pulsing Actor System - Python bindings for distributed actor framework -Provides: -- ActorSystem: Manage actors and cluster membership -- Actor: Base class for implementing actors -- Message/StreamMessage: Single and streaming message types -- ActorRef: Reference to local or remote actors +Simple API: + from pulsing.actor import init, shutdown, remote + + await init() + + @remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = await Counter.spawn(init=10) + result = await counter.incr() + + await shutdown() + +Advanced API: + from pulsing.actor import ActorSystem, Actor, Message, SystemConfig """ import asyncio @@ -27,7 +39,82 @@ # ============================================================================= -# Timeout utilities for cancellation support (方案 2+3) +# Global system for simple API +# ============================================================================= + +_global_system: ActorSystem = None + + +async def init( + addr: str = None, + *, + seeds: list[str] = None, + passphrase: str = None, +) -> ActorSystem: + """Initialize Pulsing actor system + + Args: + addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode. + seeds: Seed nodes to join cluster + passphrase: Enable TLS with this passphrase + + Returns: + ActorSystem instance + + Example: + # Standalone mode + await init() + + # Cluster mode with TLS + await init(addr="0.0.0.0:8000", passphrase="my-secret") + + # Join existing cluster + await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"]) + """ + global _global_system + + if _global_system is not None: + return _global_system + + # Build config + if addr: + config = SystemConfig.with_addr(addr) + else: + config = SystemConfig.standalone() + + if seeds: + config = config.with_seeds(seeds) + + if passphrase: + config = config.with_passphrase(passphrase) + + _global_system = await create_actor_system(config) + return _global_system + + +async def shutdown() -> None: + """Shutdown the global actor system""" + global _global_system + + if _global_system is not None: + await _global_system.shutdown() + _global_system = None + + +def get_system() -> ActorSystem: + """Get the global actor system (must call init() first)""" + if _global_system is None: + raise RuntimeError("Actor system not initialized. Call 'await init()' first.") + return _global_system + + +def is_initialized() -> bool: + """Check if the global actor system is initialized""" + return _global_system is not None + + +# ============================================================================= +# Timeout utilities for cancellation support # ============================================================================= # Default timeout for ask operations (seconds) @@ -94,7 +181,6 @@ async def tell_with_timeout( ActorClass, ActorProxy, PythonActorService, - as_actor, get_metrics, get_node_info, health_check, @@ -104,7 +190,13 @@ async def tell_with_timeout( ) __all__ = [ - # Core types + # Simple API (recommended) + "init", + "shutdown", + "get_system", + "is_initialized", + "remote", # @remote decorator (recommended) + # Core types (advanced) "ActorSystem", "NodeId", "ActorId", @@ -125,11 +217,9 @@ async def tell_with_timeout( "ask_with_timeout", "tell_with_timeout", "DEFAULT_ASK_TIMEOUT", - # Actor decorator - "as_actor", + # Actor decorator internals "ActorClass", "ActorProxy", - "remote", # Alias for backward compatibility # System helper functions "list_actors", "get_metrics", diff --git a/python/pulsing/actor/remote.py b/python/pulsing/actor/remote.py index ff1b4d5fb..63daae022 100644 --- a/python/pulsing/actor/remote.py +++ b/python/pulsing/actor/remote.py @@ -1,10 +1,10 @@ """ -@as_actor decorator - Ray-like distributed object wrapper +@remote decorator - Ray-like distributed object wrapper Usage: - from pulsing.actor import as_actor, create_actor_system, SystemConfig + from pulsing.actor import init, shutdown, remote - @as_actor + @remote class Counter: def __init__(self, init_value=0): self.value = init_value @@ -13,16 +13,16 @@ def increment(self, n=1): self.value += n return self.value - system = await create_actor_system(config) + async def main(): + await init() - # Local creation - counter = await Counter.local(system, init_value=10) + # Create actor + counter = await Counter.spawn(init_value=10) - # Remote creation (randomly selects a remote node) - counter = await Counter.remote(system, init_value=10) + # Call methods (automatically converted to actor messages) + result = await counter.increment(5) # Returns 15 - # Call methods (automatically converted to actor messages) - result = await counter.increment(5) # Returns 15 + await shutdown() """ import asyncio @@ -261,7 +261,18 @@ def factory(): class ActorClass: - """Actor class wrapper""" + """Actor class wrapper + + Provides two ways to create actors: + + 1. Simple API (uses global system): + await init() + counter = await Counter.spawn(init=10) + + 2. Explicit system: + system = await create_actor_system(config) + counter = await Counter.local(system, init=10) + """ def __init__( self, @@ -286,6 +297,39 @@ def __init__( # Register class _actor_class_registry[self._class_name] = cls + async def spawn( + self, + *args, + name: str | None = None, + **kwargs, + ) -> ActorProxy: + """Create actor using global system (simple API) + + Must call `await init()` before using this method. + + Example: + from pulsing.actor import init, remote + + await init() + + @remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = await Counter.spawn(init=10) + result = await counter.incr() + """ + # Import here to avoid circular import + from . import _global_system + + if _global_system is None: + raise RuntimeError( + "Actor system not initialized. Call 'await init()' first." + ) + + return await self.local(_global_system, *args, name=name, **kwargs) + async def local( self, system: ActorSystem, @@ -293,7 +337,7 @@ async def local( name: str | None = None, **kwargs, ) -> ActorProxy: - """Create actor locally. + """Create actor locally with explicit system. Note: Use create_actor_system() to create ActorSystem, which automatically registers PythonActorService. @@ -393,7 +437,7 @@ def __call__(self, *args, **kwargs): return self._cls(*args, **kwargs) -def as_actor( +def remote( cls: type[T] | None = None, *, restart_policy: str = "never", @@ -401,7 +445,7 @@ def as_actor( min_backoff: float = 0.1, max_backoff: float = 30.0, ) -> ActorClass: - """@as_actor decorator + """@remote decorator Converts a regular class into a distributed deployable Actor. @@ -412,7 +456,7 @@ def as_actor( - max_backoff: maximum backoff in seconds (default: 30.0) Example: - @as_actor(restart_policy="on-failure", max_restarts=5) + @remote(restart_policy="on-failure", max_restarts=5) class Counter: ... """ @@ -492,8 +536,6 @@ async def ping(system: ActorSystem, node_id: int | None = None) -> dict: return resp.to_json() -# Keep `remote` as alias (backward compatibility) -remote = as_actor RemoteClass = ActorClass # Keep old name as alias (backward compatibility) SystemActor = PythonActorService diff --git a/python/pulsing/compat/__init__.py b/python/pulsing/compat/__init__.py new file mode 100644 index 000000000..84da33c38 --- /dev/null +++ b/python/pulsing/compat/__init__.py @@ -0,0 +1,24 @@ +""" +Pulsing Compatibility Layer + +Provides Ray-compatible API for easy migration. + +Usage: + from pulsing.compat import ray + + ray.init() + + @ray.remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = Counter.remote(init=10) + result = ray.get(counter.incr.remote()) + + ray.shutdown() +""" + +from . import ray + +__all__ = ["ray"] diff --git a/python/pulsing/compat/ray.py b/python/pulsing/compat/ray.py new file mode 100644 index 000000000..a90ed8e87 --- /dev/null +++ b/python/pulsing/compat/ray.py @@ -0,0 +1,360 @@ +""" +Ray-compatible API for Pulsing + +This module provides a Ray-like synchronous API for easy migration. +For new projects, we recommend using the native async API in pulsing.actor. + +Migration from Ray: + # Before (Ray) + import ray + ray.init() + + @ray.remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = Counter.remote(init=10) + result = ray.get(counter.incr.remote()) + ray.shutdown() + + # After (Pulsing compat) + from pulsing.compat import ray # Only change this line! + + ray.init() + + @ray.remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = Counter.remote(init=10) + result = ray.get(counter.incr.remote()) + ray.shutdown() + +Note: This is a synchronous wrapper around async Pulsing. +For better performance in async environments, use pulsing.actor directly. +""" + +import asyncio +import concurrent.futures +import inspect +import threading +from typing import Any, TypeVar + +T = TypeVar("T") + +# Global state +_system = None +_loop = None +_thread = None +_loop_ready = None + + +def _ensure_not_initialized(ignore_reinit_error: bool) -> None: + global _system + if _system is not None: + if ignore_reinit_error: + return + raise RuntimeError("Already initialized. Call ray.shutdown() first.") + + +def _start_background_loop() -> None: + """Start a dedicated event loop in a background thread. + + This is required when the caller is already inside a running event loop. + """ + global _thread, _loop, _loop_ready + if _thread is not None: + return + + ready = threading.Event() + _loop_ready = ready + + def _thread_main(): + global _loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + _loop = loop + ready.set() + loop.run_forever() + try: + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + if pending: + loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + finally: + loop.close() + + t = threading.Thread( + target=_thread_main, name="pulsing-compat-ray-loop", daemon=True + ) + _thread = t + t.start() + ready.wait() + + +def _run_coro_sync(coro: Any, timeout=None) -> Any: + """Run a coroutine to completion and return its result. + + - If we have a background loop thread: schedule via run_coroutine_threadsafe(). + - Otherwise: run on the local event loop with run_until_complete(). + """ + if _loop is None: + raise RuntimeError("Not initialized. Call ray.init() first.") + + if _thread is not None: + fut = asyncio.run_coroutine_threadsafe(coro, _loop) + try: + return fut.result(timeout=timeout) + except concurrent.futures.TimeoutError as e: + raise TimeoutError("Timed out waiting for result") from e + else: + # Local (non-running) loop only + return _loop.run_until_complete(coro) + + +class ObjectRef: + """Ray-compatible ObjectRef (wraps async coroutine)""" + + def __init__(self, coro_or_result: Any, is_ready: bool = False): + self._coro = coro_or_result + self._result = coro_or_result if is_ready else None + self._is_ready = is_ready + + def _get_sync(self, timeout: float = None) -> Any: + """Get result synchronously""" + if self._is_ready: + return self._result + + async def _get(): + return await self._coro + + if timeout is not None: + coro = asyncio.wait_for(_get(), timeout) + else: + coro = _get() + + self._result = _run_coro_sync(coro, timeout=timeout) + self._is_ready = True + return self._result + + +class _MethodCaller: + """Method caller that returns ObjectRef""" + + def __init__(self, proxy, method_name: str): + self._proxy = proxy + self._method = method_name + + def remote(self, *args, **kwargs) -> ObjectRef: + """Call method remotely (Ray-style)""" + method = getattr(self._proxy, self._method) + coro = method(*args, **kwargs) + return ObjectRef(coro) + + +class _ActorHandle: + """Ray-compatible actor handle""" + + def __init__(self, proxy, methods: list[str]): + self._proxy = proxy + self._methods = set(methods) + + def __getattr__(self, name: str) -> _MethodCaller: + if name.startswith("_"): + raise AttributeError(name) + if name not in self._methods: + raise AttributeError(f"No method '{name}'") + return _MethodCaller(self._proxy, name) + + +class _ActorClass: + """Ray-compatible actor class wrapper""" + + def __init__(self, cls: type): + self._cls = cls + self._pulsing_class = None + self._methods = [ + n + for n, _ in inspect.getmembers(cls, predicate=inspect.isfunction) + if not n.startswith("_") + ] + + def _ensure_wrapped(self): + if self._pulsing_class is None: + from pulsing.actor import remote + + self._pulsing_class = remote(self._cls) + + def remote(self, *args, **kwargs) -> _ActorHandle: + """Create actor (Ray-style, synchronous)""" + if _system is None: + raise RuntimeError("Not initialized. Call ray.init() first.") + + self._ensure_wrapped() + + async def create(): + proxy = await self._pulsing_class.local(_system, *args, **kwargs) + return _ActorHandle(proxy, self._methods) + + return _run_coro_sync(create()) + + def options(self, **kwargs) -> "_ActorClass": + """Set actor options (Ray compatibility, limited support)""" + # TODO: Support num_cpus, num_gpus, etc. + return self + + def __call__(self, *args, **kwargs): + """Direct instantiation (not as actor)""" + return self._cls(*args, **kwargs) + + +def init( + address: str = None, + *, + ignore_reinit_error: bool = False, + **kwargs, +) -> None: + """Initialize Pulsing (Ray-compatible) + + Args: + address: Ignored (use SystemConfig for Pulsing configuration) + ignore_reinit_error: If True, ignore if already initialized + + Example: + from pulsing.compat import ray + ray.init() + """ + global _system, _loop + + _ensure_not_initialized(ignore_reinit_error) + + from pulsing.actor import SystemConfig, create_actor_system + + # If we're already inside a running event loop (e.g., Jupyter/pytest-asyncio), + # we must not call run_until_complete() on it. Use a dedicated background loop. + in_running_loop = True + try: + asyncio.get_running_loop() + except RuntimeError: + in_running_loop = False + + if in_running_loop: + _start_background_loop() + else: + _loop = asyncio.new_event_loop() + asyncio.set_event_loop(_loop) + + config = SystemConfig.standalone() + _system = _run_coro_sync(create_actor_system(config)) + + +def shutdown() -> None: + """Shutdown Pulsing (Ray-compatible)""" + global _system, _loop, _thread, _loop_ready + + if _system is not None: + try: + _run_coro_sync(_system.shutdown()) + except Exception: + pass + _system = None + if _thread is not None and _loop is not None: + try: + _loop.call_soon_threadsafe(_loop.stop) + except Exception: + pass + try: + _thread.join(timeout=2.0) + except Exception: + pass + _thread = None + _loop_ready = None + _loop = None + else: + _loop = None + + +def is_initialized() -> bool: + """Check if initialized""" + return _system is not None + + +def remote(cls: type[T]) -> _ActorClass: + """@ray.remote decorator (Ray-compatible) + + Example: + @ray.remote + class Counter: + def __init__(self, init=0): self.value = init + def incr(self): self.value += 1; return self.value + + counter = Counter.remote(init=10) + """ + return _ActorClass(cls) + + +def get(refs: Any, *, timeout: float = None) -> Any: + """Get results from ObjectRefs (Ray-compatible) + + Args: + refs: Single ObjectRef or list of ObjectRefs + timeout: Timeout in seconds + + Example: + result = ray.get(counter.incr.remote()) + results = ray.get([ref1, ref2, ref3]) + """ + if _system is None: + raise RuntimeError("Not initialized. Call ray.init() first.") + + if isinstance(refs, list): + return [r._get_sync(timeout) for r in refs] + return refs._get_sync(timeout) + + +def put(value: Any) -> ObjectRef: + """Put value (Ray-compatible) + + Note: Pulsing doesn't have distributed object store. + This just wraps the value for API compatibility. + """ + return ObjectRef(value, is_ready=True) + + +def wait( + refs: list, + *, + num_returns: int = 1, + timeout: float = None, +) -> tuple[list, list]: + """Wait for ObjectRefs (Ray-compatible) + + Returns: + (ready, remaining) tuple + """ + ready, remaining = [], list(refs) + for ref in refs[:num_returns]: + try: + get(ref, timeout=timeout) + ready.append(ref) + remaining.remove(ref) + except Exception: + break + return ready, remaining + + +__all__ = [ + "init", + "shutdown", + "is_initialized", + "remote", + "get", + "put", + "wait", + "ObjectRef", +] diff --git a/tests/python/test_new_api.py b/tests/python/test_new_api.py new file mode 100644 index 000000000..48c19e09e --- /dev/null +++ b/tests/python/test_new_api.py @@ -0,0 +1,336 @@ +""" +Tests for the new Pulsing API styles. + +Covers: +- Native async API (pulsing.actor with init/shutdown/remote) +- Ray-compatible API (pulsing.compat.ray) +""" + +import asyncio + +import pytest + + +# ============================================================================ +# Native Async API Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_native_api_basic(): + """Test basic native async API workflow.""" + from pulsing.actor import init, shutdown, remote + + @remote + class Counter: + def __init__(self, value=0): + self.value = value + + def get(self): + return self.value + + def inc(self, n=1): + self.value += n + return self.value + + await init() + + try: + counter = await Counter.spawn(value=10) + assert await counter.get() == 10 + assert await counter.inc(5) == 15 + assert await counter.inc() == 16 + finally: + await shutdown() + + +@pytest.mark.asyncio +async def test_native_api_multiple_actors(): + """Test multiple actors with native API.""" + from pulsing.actor import init, shutdown, remote + + @remote + class Worker: + def __init__(self, worker_id): + self.worker_id = worker_id + self.tasks_done = 0 + + def process(self, data): + self.tasks_done += 1 + return f"{self.worker_id}: processed {data}" + + def get_stats(self): + return {"id": self.worker_id, "tasks": self.tasks_done} + + await init() + + try: + workers = [await Worker.spawn(worker_id=f"w{i}") for i in range(3)] + + # Process some tasks + results = [] + for i, w in enumerate(workers): + result = await w.process(f"task-{i}") + results.append(result) + + assert len(results) == 3 + assert "w0: processed task-0" in results[0] + assert "w1: processed task-1" in results[1] + assert "w2: processed task-2" in results[2] + + # Check stats + for i, w in enumerate(workers): + stats = await w.get_stats() + assert stats["id"] == f"w{i}" + assert stats["tasks"] == 1 + + finally: + await shutdown() + + +@pytest.mark.asyncio +async def test_native_api_async_methods(): + """Test actors with async methods.""" + from pulsing.actor import init, shutdown, remote + + @remote + class AsyncProcessor: + def __init__(self): + self.processed = [] + + async def process(self, item): + await asyncio.sleep(0.01) # Simulate async work + result = item.upper() + self.processed.append(result) + return result + + def get_processed(self): + return self.processed + + await init() + + try: + processor = await AsyncProcessor.spawn() + + # Process multiple items + results = await asyncio.gather( + processor.process("hello"), + processor.process("world"), + processor.process("pulsing"), + ) + + assert "HELLO" in results + assert "WORLD" in results + assert "PULSING" in results + + processed = await processor.get_processed() + assert len(processed) == 3 + + finally: + await shutdown() + + +@pytest.mark.asyncio +async def test_native_api_concurrent_calls(): + """Test concurrent calls to the same actor.""" + from pulsing.actor import init, shutdown, remote + + @remote + class Counter: + def __init__(self): + self.value = 0 + + def inc(self): + self.value += 1 + return self.value + + def get(self): + return self.value + + await init() + + try: + counter = await Counter.spawn() + + # Send many concurrent increments + tasks = [counter.inc() for _ in range(100)] + await asyncio.gather(*tasks) + + # Due to actor's sequential processing, final value should be 100 + final = await counter.get() + assert final == 100 + + finally: + await shutdown() + + +# ============================================================================ +# Ray-Compatible API Tests +# ============================================================================ + + +def test_ray_compat_api_basic(): + """Test basic Ray-compatible API workflow.""" + from pulsing.compat import ray + + ray.init() + + try: + + @ray.remote + class Counter: + def __init__(self, value=0): + self.value = value + + def get(self): + return self.value + + def inc(self, n=1): + self.value += n + return self.value + + counter = Counter.remote(value=10) + assert ray.get(counter.get.remote()) == 10 + assert ray.get(counter.inc.remote(5)) == 15 + assert ray.get(counter.inc.remote()) == 16 + + finally: + ray.shutdown() + + +def test_ray_compat_api_multiple_actors(): + """Test multiple actors with Ray-compatible API.""" + from pulsing.compat import ray + + ray.init() + + try: + + @ray.remote + class Worker: + def __init__(self, worker_id): + self.worker_id = worker_id + + def process(self, data): + return f"{self.worker_id}: {data}" + + workers = [Worker.remote(worker_id=f"w{i}") for i in range(3)] + + # Process tasks + refs = [w.process.remote(f"task-{i}") for i, w in enumerate(workers)] + results = ray.get(refs) + + assert len(results) == 3 + assert "w0: task-0" in results[0] + assert "w1: task-1" in results[1] + assert "w2: task-2" in results[2] + + finally: + ray.shutdown() + + +def test_ray_compat_api_wait(): + """Test ray.wait() functionality.""" + from pulsing.compat import ray + + ray.init() + + try: + + @ray.remote + class SlowWorker: + def work(self, duration): + import time + + time.sleep(duration) + return f"done after {duration}s" + + worker = SlowWorker.remote() + + # Submit multiple tasks with different durations + refs = [ + worker.work.remote(0.01), + worker.work.remote(0.02), + worker.work.remote(0.03), + ] + + # Wait for at least 1 to complete + ready, remaining = ray.wait(refs, num_returns=1, timeout=5.0) + + assert len(ready) >= 1 + assert len(ready) + len(remaining) == 3 + + # Get all results + all_results = ray.get(refs) + assert len(all_results) == 3 + + finally: + ray.shutdown() + + +def test_ray_compat_api_put_get(): + """Test ray.put() and ray.get() for object store.""" + from pulsing.compat import ray + + ray.init() + + try: + # Put objects in store + ref1 = ray.put({"key": "value1"}) + ref2 = ray.put([1, 2, 3, 4, 5]) + ref3 = ray.put("hello world") + + # Get objects back + assert ray.get(ref1) == {"key": "value1"} + assert ray.get(ref2) == [1, 2, 3, 4, 5] + assert ray.get(ref3) == "hello world" + + # Batch get + results = ray.get([ref1, ref2, ref3]) + assert len(results) == 3 + + finally: + ray.shutdown() + + +# ============================================================================ +# API Migration Tests +# ============================================================================ + + +def test_migration_pattern(): + """ + Demonstrate migration pattern from Ray to Pulsing. + + This test shows that the same logic works with both APIs. + """ + + # The actual computation logic + def create_counter_class(decorator): + @decorator + class Counter: + def __init__(self, value=0): + self.value = value + + def inc(self): + self.value += 1 + return self.value + + return Counter + + # Test with Ray-compatible API + from pulsing.compat import ray + + ray.init() + + try: + RayCounter = create_counter_class(ray.remote) + counter = RayCounter.remote(value=0) + result = ray.get(counter.inc.remote()) + assert result == 1 + finally: + ray.shutdown() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/python/test_ray_compat_running_loop.py b/tests/python/test_ray_compat_running_loop.py new file mode 100644 index 000000000..f5b6b4d22 --- /dev/null +++ b/tests/python/test_ray_compat_running_loop.py @@ -0,0 +1,31 @@ +import asyncio + + +def test_ray_compat_init_inside_running_loop(): + """ray.init() should work even when called from within a running event loop. + + This covers environments like Jupyter or pytest-asyncio where an event loop + is already running on the main thread. + """ + from pulsing.compat import ray + + async def main(): + ray.init() + try: + + @ray.remote + class Counter: + def __init__(self, value=0): + self.value = value + + def inc(self, n=1): + self.value += n + return self.value + + c = Counter.remote(value=1) + assert ray.get(c.inc.remote()) == 2 + assert ray.get(c.inc.remote(10)) == 12 + finally: + ray.shutdown() + + asyncio.run(main()) diff --git a/tests/python/test_system_actor.py b/tests/python/test_system_actor.py index ec9c5c01c..b077d5bcb 100644 --- a/tests/python/test_system_actor.py +++ b/tests/python/test_system_actor.py @@ -20,7 +20,7 @@ get_node_info, health_check, ping, - as_actor, + remote, ) @@ -277,13 +277,13 @@ async def test_python_actor_service_list_registry(system): # ============================================================================ -# Test: @as_actor with PythonActorService +# Test: @remote with PythonActorService # ============================================================================ -@as_actor +@remote class TestCounter: - """Test counter class for @as_actor.""" + """Test counter class for @remote.""" def __init__(self, init_value=0): self.value = init_value @@ -297,8 +297,8 @@ def get_value(self): @pytest.mark.asyncio -async def test_as_actor_local_creation(system): - """@as_actor should allow local actor creation.""" +async def test_remote_local_creation(system): + """@remote should allow local actor creation.""" counter = await TestCounter.local(system, init_value=10) # Should be able to call methods @@ -310,8 +310,8 @@ async def test_as_actor_local_creation(system): @pytest.mark.asyncio -async def test_as_actor_class_registered(system): - """@as_actor decorated class should be registered in global registry.""" +async def test_remote_class_registered(system): + """@remote decorated class should be registered in global registry.""" service_ref = await system.resolve_named("_python_actor_service") msg = Message.from_json("ListRegistry", {}) resp = await service_ref.ask(msg)