关注梦兽编程微信公众号,轻松入门Rust

夜里十点,咖啡店只剩几盏橘灯。我站在吧台后面,看着新来的小伙伴系好围裙。

“记住,”我递给他一只对讲机,“每个人都有自己的收件箱。来了单,不要喊全店,直接塞进该谁的盒子里。他会一杯一杯做,谁也不跟谁抢锅。店里就不会乱。”

他点点头。我又指了指墙上的排班表:“有人累趴了怎么办?简单,我们有店长值守。谁掉了线,店长把他拉起来,或者给你换个新人。名字都登记在前台的通讯录里,你要找人,叫名字就行。”

“听起来不复杂。”他笑。

“是啊。”我把清单推到他面前,“在 Rust 里,这就是 Actor 模型。别把并发想成群聊,它更像一间小而有序的店:消息是订单,Actor 是店员,对讲机是地址,监督是店长,通讯录是注册表。我们现在就开门营业。”

接下来,我们把这个小店搭起来。代码很短,路子要清。

1) 定义 Actor:给“店员”定个规矩

#[async_trait::async_trait]
pub trait Actor: Send + 'static {
    type Message: Send + 'static;
    async fn handle(&mut self, msg: Self::Message);
}

规矩其实就两条:你负责哪类单(Message),来了单就别废话,接过来自己消化(handle)。像吧台上的老话——接单的人,决定了它的命。

2) 地址 Addr<T>:给“店员”一部对讲机

我把对讲机塞到他手里,说:“看见红灯亮,就按一下,说一句话,订单就飞进对方的盒子里了。你不用追着人跑,也不用吼一嗓子全店都听见。” 这只对讲机,在代码里就是一条 mpsc 发送通道,简称 Addr<T>

use tokio::sync::mpsc;

#[derive(Clone)]
pub struct Addr<T> {
    sender: mpsc::Sender<T>,
}

impl<T> Addr<T> {
    pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError<T>> {
        self.sender.send(msg).await
    }
}

3) spawn_actor:把“店员”拉上班

他打上班卡,围裙一系,站到自己的工位。收件箱里咔嗒一响,有单进来,他就按流程做;没单,就安静等下一杯。这就是“上班”的样子。

use tokio::sync::mpsc;
use tokio::task::JoinHandle;

pub fn spawn_actor<A>(mut actor: A, capacity: usize) -> (Addr<A::Message>, JoinHandle<()>)
where
    A: Actor,
{
    let (tx, mut rx) = mpsc::channel::<A::Message>(capacity);
    let handle = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            actor.handle(msg).await;
        }
    });
    (Addr { sender: tx }, handle)
}
  • 收件箱满了就会 send 等一会儿,天然限流,像排队点单。

4) 监督 supervise_actor:店长在旁边看着

店长总是最后一个下班的人。他不多话,只盯两件事:人还在不在,店还运不运转。谁要是忽然倒下了,他先让店缓一缓,再把人扶起来,或者换个新班。稳,是他唯一的性格。

use std::time::Duration;

pub fn supervise_actor<A, F>(make: F, capacity: usize) -> Addr<A::Message>
where
    A: Actor,
    F: Fn() -> A + Send + Sync + 'static,
{
    let (addr_tx, addr_rx) = std::sync::mpsc::sync_channel::<Addr<A::Message>>(1);

    std::thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().expect("rt");
        rt.block_on(async move {
            loop {
                let (addr, handle) = spawn_actor(make(), capacity);
                let _ = addr_tx.send(addr.clone());
                // 等待 Actor 结束(无论正常结束还是异常崩溃)
                let _ = handle.await;
                // 歇口气再拉人上班(持续监督)
                tokio::time::sleep(Duration::from_millis(200)).await;
            }
        });
    });

    // 拿到第一次可用地址
    addr_rx.recv().expect("supervisor failed to start")
}

店长不紧不慢,但心里有数。真到生产环境,你会把监督放进同一个 tokio 现场里,配上退避、上限、告警,像老店的 SOP,一件不少。

5) 命名注册表:像前台的“通讯录”

前台抽屉里有一本本子,写着每个人的名字和对讲机频道。你不需要记号码,只要报名字,就能把话递到人手里。注册表,就是那本本子。

use once_cell::sync::Lazy;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

static REGISTRY: Lazy<Arc<Mutex<HashMap<String, Box<dyn Any + Send + Sync>>>>> = Lazy::new(|| {
    Arc::new(Mutex::new(HashMap::new()))
});

pub fn register_addr<T: 'static + Send + Sync>(name: &str, addr: Addr<T>) {
    let mut map = REGISTRY.lock().unwrap();
    map.insert(name.to_string(), Box::new(addr));
}

pub fn get_addr<T: 'static + Send + Sync>(name: &str) -> Option<Addr<T>> {
    let map = REGISTRY.lock().unwrap();
    map.get(name)
        .and_then(|b| b.downcast_ref::<Addr<T>>())
        .cloned()
}

说明:上面是演示思路。实际项目里更推荐用 dashmap 与类型安全封装,或在应用层集中管理地址(避免跨类型擦除)。

6) 示例:做一杯“计数咖啡”

第一天的夜班,客人不多。我让他做个小游戏:每来一杯,就在心里数一下;有人问今天做了几杯,就把数字报出去。两种话术,足够练手:Inc(n) 表示“再来 n 杯”,Get 表示“现在是多少?”

use tokio::sync::oneshot;

enum CounterMsg {
    Inc(u64),
    Get(oneshot::Sender<u64>),
}

struct Counter { value: u64 }

#[async_trait::async_trait]
impl Actor for Counter {
    type Message = CounterMsg;
    async fn handle(&mut self, msg: Self::Message) {
        match msg {
            CounterMsg::Inc(n) => self.value += n,
            CounterMsg::Get(tx) => { let _ = tx.send(self.value); }
        }
    }
}

#[tokio::main]
async fn main() {
    let (addr, _h) = spawn_actor(Counter { value: 0 }, 64);

    addr.send(CounterMsg::Inc(3)).await.unwrap();
    addr.send(CounterMsg::Inc(7)).await.unwrap();

    let (tx, rx) = oneshot::channel();
    addr.send(CounterMsg::Get(tx)).await.unwrap();

    let v = rx.await.unwrap();
    println!("counter = {}", v); // 10
}

看到了吧?两次“再来一杯”,一次“报个数”。他没有抱怨、没有抢活,按顺序来,店里像钟表一样走得稳。

7) 扩展方向(按需挑选)

要是这家小店慢慢火起来,你会顺势添置些家伙事:前台挂个 HTTP 外卖窗(在 axum/warp 的接口里握着 Addr<T>,来一单就丢给后厨);后厨装一条传送带(mpsc/broadcast/Stream 把半成品一路往前送);人多了分出“分单台”和“工作池”,各司其职;店长升级 SOP,加上退避、分级、限次;再装上几块表,时延、队列、失败率用 tracing 亮出来,夜里也睡得踏实。

常见疑问

问:我能不能直接 await 一个返回值? 答:可以,但更地道的做法是给对方塞一个一次性回信的信封(oneshot),他办完事,把结果塞回来。

问:消息会不会丢? 答:队列是有容量的,满了就会让你等一会儿(背压)。容量和节奏设计好了,队里不乱,消息就稳。

问:性能咋样? 答:大多数业务绰绰有余。真要冲极限,你可以上无锁结构、专用执行器,再做点本地化优化。

小结

这就是我们的小店:人各有岗,话各有路,忙而不乱。你看到的名词——ActorAddrspawnsupervise、注册表——不过是把这套秩序落在了代码里。别急着堆功能,先把“消息驱动、串行处理、可监督”这三件事做稳了。等哪天店里排起了长队,你会发现,这套老规矩,正是你扩张的底气。

关注梦兽编程微信公众号,解锁更多黑科技