rust ABC (四)并发编程和tokio

rust ABC (四)并发编程和tokio

rust作为一个功能强大的语言,支持多种编程范式,同时支持抢夺式进程和协程,同时对于共享变量提供了线程锁,引用计数,管道的同步和通讯机制,具备强大的功能。同时可以使用tokio,一个三方库,更强大的并发运行时,替换 std 库中的并发。由于基本上 tokio 成了rust程序的默认并发运行时,这里主要讲 tokio ,不过这里的 tokiostd 的函数签名基本是相同的,比如 spawntask 等,因此无须有多余的担心。最后会讲 async 和 await 关键字,用于实现异步任务

tokio 的 runtime 和 task

runtime 也就是 抢夺式进程 ,task 也就是协程。

其中 runtime 创建方式如下

use tokio;

fn main() {
  // 创建带有线程池的runtime
  let rt = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(8)  // 8个工作线程
    .enable_io()        // 可在runtime中使用异步IO
    .enable_time()      // 可在runtime中使用异步计时器(timer)
    .build()            // 创建runtime
    .unwrap();

    rt.block_on(async {
        println!("before sleep: {}", Local::now().format("%F %T.%3f"));
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        println!("after sleep: {}", Local::now().format("%F %T.%3f"));
    });

}

也可以直接使用 runtime

fn main() {
    tokio::spawn(async {
        println!("before sleep: {}", Local::now().format("%F %T.%3f"));
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        println!("after sleep: {}", Local::now().format("%F %T.%3f"));
    });

}	

task 又被称作Asynchronous green-threads(异步的绿色线程),创建方式如下

use chrono::Local;
use std::thread;
use tokio::{self, task, runtime::Runtime, time};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    let _guard = rt.enter();
    task::spawn(async {
        time::sleep(time::Duration::from_secs(3)).await;
        println!("task over: {}", now());
    });

    thread::sleep(time::Duration::from_secs(4));
}

或者直接使用 task

fn main() {
    tokio::task::spawn(async {
        println!("before sleep: {}", Local::now().format("%F %T.%3f"));
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        println!("after sleep: {}", Local::now().format("%F %T.%3f"));
    });

}	

共享变量

并发程序中如何处理并发问题,一般来说有两种思路。一种是java中同步机制,使用锁来控制并发。另一种是go的协程机制,通过通讯来共享内存,而不是共享内存来通讯。

同步

tokio::sync模块提供了几种状态同步的机制:

然后就是使用互斥锁,一般签名如下

let mutex = Arc::new(Mutex::new());
let rwlock = Arc::new(RwLock::new());

这里就不说明使用方式了,一般有编程基础的都会使用,这里只需要注意两点

然后关于栅栏类,提供了以下方式

举例如下

    let barrier = Arc::new(Barrier::new(3));


    for t in 0..3 {
        let c = barrier.clone();
        tokio::task::spawn(async move {
            for i in 1..100 {
                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
                println!("开始执行第{}个", i);
            }
            c.wait().await;
            println!("after wait");
        });
    }

    println!("主线程完成");
通讯

也就是go中的管道(channel),go中的管道有无界管道和有界管道两种类型,同样Rust也是支持无界管道和有界管道。在此基础之上,go中常用的就是多对一,一种类型,rust中有一对一、一对多、多对一、多对多四种类型,但是使用最多的是多对一这一类型。

一个异步的例子

use tokio::{ self, runtime::Runtime, sync };

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let (tx, mut rx) = sync::mpsc::channel::<i32>(10);

        for i in 1..=10 {
            let tx = tx.clone();
            tokio::spawn(async move {
                if tx.send(i).await.is_err() {
                    println!("receiver closed");
                }
            });
        }
        drop(tx);

        while let Some(i) = rx.recv().await {
            println!("received: {}", i);
        }
    });
}

async 和 await 关键字

简单来说如果写过一些前端,那么就能够理解这两个关键字,这两个关键字就是协程的一种实现方式,将主任务拆分成多个子任务,并且通过CPS 的变换,消除了callback。

简单来说Rust就是以下几个特性

一个文件io的场景如下

use tokio::{self, fs::File, io::{self, AsyncReadExt}, runtime};

fn main() {
    let rt = runtime::Runtime::new().unwrap();
    rt.block_on(async {
        let f1 = File::open("a.log").await.unwrap();
        let f2 = File::open("b.log").await.unwrap();
        let mut f = f1.chain(f2);

        let mut data = String::new();
        let n = f.read_to_string(&mut data).await.unwrap();
        println!("data {} bytes: {:?}", n, data);
    });
}