Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/overrides/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ <h3>Streaming Messages</h3>
<span class="feature-icon">🐍</span>
{% if config.theme.language == "zh" %}
<h3>Python 优先</h3>
<p>通过 PyO3 提供完整的 Python API。使用 <code>@as_actor</code> 装饰器将任何类转换为分布式 Actor。</p>
<p>通过 PyO3 提供完整的 Python API。使用 <code>@remote</code> 装饰器将任何类转换为分布式 Actor。</p>
{% else %}
<h3>Python First</h3>
<p>Full Python API via PyO3. Use the <code>@as_actor</code> decorator to turn any class into a distributed actor.</p>
<p>Full Python API via PyO3. Use the <code>@remote</code> decorator to turn any class into a distributed actor.</p>
{% endif %}
</div>
<div class="feature-card">
Expand Down Expand Up @@ -607,9 +607,9 @@ <h3>{% if config.theme.language == "zh" %}安装 Pulsing{% else %}Install Pulsin
<div class="step-content">
<h3>{% if config.theme.language == "zh" %}创建你的第一个 Actor{% else %}Create Your First Actor{% endif %}</h3>
<div class="code-block">
<pre><code class="language-python">from pulsing.actor import as_actor, create_actor_system, SystemConfig
<pre><code class="language-python">from pulsing.actor import init, shutdown, remote

@as_actor
@remote
class Calculator:
def __init__(self, initial: int = 0):
self.value = initial
Expand All @@ -631,16 +631,16 @@ <h3>{% if config.theme.language == "zh" %}运行它{% else %}Run It{% endif %}</
<pre><code class="language-python">import asyncio

async def main():
system = await create_actor_system(SystemConfig.standalone())
await init()

calc = await Calculator.local(system, initial=100)
calc = await Calculator.spawn(initial=100)

result = await calc.add(50) # 150
result = await calc.add(25) # 175
value = await calc.get() # 175

print(f"Final value: {value}")
await system.shutdown()
await shutdown()

asyncio.run(main())</code></pre>
</div>
Expand Down
15 changes: 11 additions & 4 deletions docs/src/api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,14 @@ class ActorRef:

## Decorators

### @as_actor
### @remote

Convert a class into an Actor automatically.

```python
@as_actor
from pulsing.actor import init, shutdown, remote

@remote
class MyActor:
def __init__(self, value: int):
self.value = value
Expand All @@ -210,12 +212,17 @@ class MyActor:

async def process(self, data: str) -> dict:
return {"result": data.upper()}

async def main():
await init()
actor = await MyActor.spawn(value=10)
print(await actor.get()) # 10
await shutdown()
```

After decoration, the class provides:

- `local(system, **kwargs) -> ActorRef`: Create actor locally
- `remote(system, **kwargs) -> ActorRef`: Create actor remotely (or locally if single node)
- `spawn(**kwargs) -> ActorRef`: Create actor (uses global system from `init()`)

## Functions

Expand Down
15 changes: 11 additions & 4 deletions docs/src/api_reference.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,14 @@ class ActorRef:

## 装饰器

### @as_actor
### @remote

自动将类转换为 Actor。

```python
@as_actor
from pulsing.actor import init, shutdown, remote

@remote
class MyActor:
def __init__(self, value: int):
self.value = value
Expand All @@ -210,12 +212,17 @@ class MyActor:

async def process(self, data: str) -> dict:
return {"result": data.upper()}

async def main():
await init()
actor = await MyActor.spawn(value=10)
print(await actor.get()) # 10
await shutdown()
```

装饰后,类提供:

- `local(system, **kwargs) -> ActorRef`: 本地创建 actor
- `remote(system, **kwargs) -> ActorRef`: 远程创建 actor(单节点时回退到本地)
- `spawn(**kwargs) -> ActorRef`: 创建 actor(使用 `init()` 初始化的全局系统)

## 函数

Expand Down
98 changes: 48 additions & 50 deletions docs/src/design/as-actor-decorator.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# @as_actor 装饰器设计文档
# @remote 装饰器设计文档

## 概述

`@as_actor` 是一个便利装饰器,将普通 Python 类自动转换为分布式 Actor。它提供类似 Ray 的编程体验,让用户无需关心底层的消息传递细节。
`@remote` 是一个便利装饰器,将普通 Python 类自动转换为分布式 Actor。它提供类似 Ray 的编程体验,让用户无需关心底层的消息传递细节。

## 设计目标

Expand All @@ -15,16 +15,16 @@

```mermaid
flowchart TB
subgraph Decorator["@as_actor 装饰器"]
A["@as_actor<br/>class Counter"] --> B["ActorClass(Counter)"]
subgraph Decorator["@remote 装饰器"]
A["@remote<br/>class Counter"] --> B["ActorClass(Counter)"]
end

B --> C[".local()"]
B --> D[".remote()"]
B --> C[".spawn()"]
B --> D[".local()"]
B --> E[".__call__()"]

C --> F["本地 Actor"]
D --> G["远程 Actor"]
C --> F["Actor (全局系统)"]
D --> G["Actor (指定系统)"]
E --> H["普通实例"]

F --> I["ActorProxy"]
Expand All @@ -47,9 +47,9 @@ flowchart TB

| 方法 | 说明 |
|------|------|
| `.local(system, *args, **kwargs)` | 在本地节点创建 Actor |
| `.remote(system, *args, **kwargs)` | 在远程节点创建 Actor(随机选择) |
| `(*args, **kwargs)` | 直接调用,返回普通实例(非 Actor) |
| `.spawn(**kwargs)` | 使用全局系统创建 Actor(推荐) |
| `.local(system, **kwargs)` | 在指定系统本地节点创建 Actor |
| `(**kwargs)` | 直接调用,返回普通实例(非 Actor) |

### 2. ActorProxy

Expand All @@ -65,16 +65,16 @@ response = await actor_ref.ask(msg)
return response["result"]
```

### 3. SystemActor
### 3. PythonActorService

每个节点自动创建的系统 Actor,负责处理远程 Actor 创建请求:

```mermaid
sequenceDiagram
participant A as Node A
participant B as Node B (SystemActor)
participant B as Node B (PythonActorService)

A->>A: Counter.remote(system)
A->>A: Counter.spawn()
A->>A: 选择远程节点 B

A->>B: CreateActor {class, args}
Expand Down Expand Up @@ -103,9 +103,9 @@ class _WrappedActor:
### 基本用法

```python
from pulsing.actor import as_actor, create_actor_system, SystemConfig
from pulsing.actor import init, shutdown, remote

@as_actor
@remote
class Counter:
def __init__(self, init_value=0):
self.value = init_value
Expand All @@ -118,56 +118,55 @@ class Counter:
return self.value

async def main():
system = await create_actor_system(SystemConfig.standalone())
await init()

# 创建 Actor
counter = await Counter.local(system, init_value=10)
counter = await Counter.spawn(init_value=10)

# 调用方法
print(await counter.get()) # 10
print(await counter.increment(5)) # 15

await shutdown()
```

### 远程创建
### 集群模式

```python
# 节点 A
system_a = await create_actor_system(
SystemConfig.with_addr("0.0.0.0:8001")
)

# 节点 B(加入集群)
system_b = await create_actor_system(
SystemConfig.with_addr("0.0.0.0:8002").with_seeds(["127.0.0.1:8001"])
)

# 在节点 B 上执行,Actor 会创建在节点 A
counter = await Counter.remote(system_b, init_value=100)
print(await counter.get()) # 100 (数据在节点 A)
# 节点 A (Seed)
await init(addr="0.0.0.0:8001")

# 节点 B (加入集群)
await init(addr="0.0.0.0:8002", seeds=["127.0.0.1:8001"])

# 创建 Actor (可能在任意节点)
counter = await Counter.spawn(init_value=100)
print(await counter.get()) # 100
```

### 异步方法支持

```python
@as_actor
@remote
class AsyncWorker:
async def fetch_data(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()

worker = await AsyncWorker.local(system)
worker = await AsyncWorker.spawn()
html = await worker.fetch_data("https://example.com")
```

### 命名 Actor

```python
# 指定名称,便于其他节点发现
counter = await Counter.local(system, name="global_counter", init_value=0)
counter = await Counter.spawn(name="global_counter", init_value=0)

# 其他地方可以通过名称解析
ref = await system.resolve_named("global_counter")
from pulsing.actor import get_system
ref = await get_system().resolve_named("global_counter")
```

### 作为普通类使用
Expand All @@ -180,19 +179,18 @@ counter.increment(5) # 同步调用,返回 15

## 与 Ray 对比

| 特性 | Ray | Pulsing @as_actor |
|------|-----|-------------------|
| 装饰器 | `@ray.remote` | `@as_actor` |
| 本地创建 | `Counter.remote()` | `Counter.local(system)` |
| 远程创建 | `Counter.options(resources=...).remote()` | `Counter.remote(system)` |
| 特性 | Ray | Pulsing @remote |
|------|-----|-----------------|
| 装饰器 | `@ray.remote` | `@remote` |
| 创建 Actor | `Counter.remote()` | `await Counter.spawn()` |
| 方法调用 | `ray.get(counter.increment.remote(5))` | `await counter.increment(5)` |
| 调度策略 | 自动调度 + 资源约束 | 随机选择远程节点 |
| 调度策略 | 自动调度 + 资源约束 | 随机选择节点 |
| 依赖 | Ray 集群 | 无外部依赖 |

## 限制

1. **方法参数必须可 JSON 序列化** - 参数通过 JSON 传输
2. **返回值必须可 JSON 序列化** - 结果通过 JSON 返回
1. **方法参数必须可序列化** - 参数通过 pickle 传输
2. **返回值必须可序列化** - 结果通过 pickle 返回
3. **类必须在所有节点可导入** - 远程创建需要目标节点能 import 该类
4. **不支持属性访问** - 只能调用方法,不能直接访问 `counter.value`

Expand All @@ -201,7 +199,7 @@ counter.increment(5) # 同步调用,返回 15
### 1. 方法设计

```python
@as_actor
@remote
class GoodDesign:
# ✓ 返回完整状态
def get_state(self):
Expand All @@ -226,15 +224,15 @@ except RuntimeError as e:

```python
# 并行调用多个 Actor
workers = [await Worker.local(system, id=i) for i in range(4)]
workers = [await Worker.spawn(id=i) for i in range(4)]
results = await asyncio.gather(*[w.process(data) for w in workers])
```

## 内部实现

### 类注册表

`@as_actor` 装饰时,类会被注册到全局表:
`@remote` 装饰时,类会被注册到全局表:

```python
_actor_class_registry["__main__.Counter"] = Counter
Expand Down Expand Up @@ -279,7 +277,7 @@ _actor_class_registry["__main__.Counter"] = Counter

## 未来规划

- [ ] 支持指定目标节点 `Counter.remote(system, node_id=xxx)`
- [ ] 支持指定目标节点 `Counter.spawn(node_id=xxx)`
- [ ] 支持负载均衡策略(轮询、最小负载等)
- [ ] 支持资源约束 `Counter.local(system, num_cpus=2)`
- [ ] 支持 Actor 池 `CounterPool.local(system, size=4)`
- [ ] 支持资源约束 `Counter.spawn(num_cpus=2)`
- [ ] 支持 Actor 池 `CounterPool.spawn(size=4)`
Loading
Loading