Async building blocks

Differences between async & sync

  • sync programming often has imperative behaviour (hành vi bắt buộc, sai khiến)
  • async programming is about constructing a process at runtime and then executing it
  • this process is called the “futures tree”

Futures

An async fn returns a Future

  • async fn returns a Future type
  • Future is a trait that is implemented by types that can be asynchronously awaited
  • For e.g., this following async fn returns a Future type:
use tokio::{fs::File, io::AsyncReadExt};

async fn read_from_disk(path: &str) -> std::io::Result<String> {
    let mut file = File::open(path).await?;

    let mut buffer = String::new();
    file.read_to_string(&mut buffer).await?;
    Ok(buffer)
}

Then, desugaring the above async fn, please notice the fn signature is changed to:

use std::future::Future;

use tokio::{fs::File, io::AsyncReadExt};

// The return type of the function is now a generic type `F` that implements `Future`
fn read_from_disk<'a>(path: &'a str)
   -> impl Future<Output = std::io::Result<String>> + 'a
{
    // The body of the function is now a `async` block
    // move keyword is used to move the variables into the closure
    async move {
        let mut file = File::open(path).await?;

        let mut buffer = String::new();
        file.read_to_string(&mut buffer).await?;
        Ok(buffer)
    }
}

What are Futures?

  • A Future is a type that represents an asynchronous computation
  • Futures represent a datastructure that - at some point in the future - give us the value that we are waiting for. The Future may be:
    • delayed
    • immediate
    • infinite

Futures are operations

Futures are complete operations that can be awaited for. Examples:

  • read: Read (up to) a number of bytes
  • read_to_end: Read a complete input stream
  • connect: Connect a socket

Futures are poll-based

They can be checked if they are done, and are usually mapped to readiness based APIs like epoll.

.await registers interest in completion

Futures need to be executed

use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let read_from_disk_future = read_from_disk("Cargo.toml");

    let result = async {
        let task = tokio::task::spawn(read_from_disk_future);
        task.await
    }
    .await;

    println!("{:?}", result);
}

async fn read_from_disk(path: &str) -> std::io::Result<String> {
    let mut file = File::open(path).await?;

    let mut buffer = String::new();
    file.read_to_string(&mut buffer).await?;
    Ok(buffer)
}

Tasks

  • A task connects a future to the executor
  • The task is the concurrent unit!
  • A task is similar to a thread, but is user-space scheduled

Futures all the way down: Combining Futures

use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let read_from_disk_future = read_from_disk("Cargo.toml");

    let timeout = Duration::from_millis(1000);
    let timeout_read = tokio::time::timeout(timeout, read_from_disk_future);

    let result = async {
        let task = tokio::task::spawn(timeout_read);
        task.await
    }
    .await;

    println!("{:?}", result);
}

Ownership/Borrowing Memory in concurrent systems

  • Ownership works just like expected - it flows in and out of tasks/futures
  • Borrows work over .await points
  • This means: All owned memory in a Future must remain at the same place (Tất cả bộ nhớ được sở hữu trong 1 Future phải ở cùng một vị trí).
  • Sharing between tasks is often done using Rc/Arc

Categories of Executors

  • single-threaded

    • Generally better latency, no synchronization requirements
    • Highly susceptible to accidental blockades (rất dễ bị phong tỏa ngẫu nhiên)
    • Harmed by accidental pre-emption (bị hại do vô tình sử dụng trước)
  • multi-threaded

    • Generally better resource use, synchronization requirements
    • Harmed by accidental pre-emption
  • deblocking (gỡ lỗi)

    • Actively monitor for blocked execution threads and will spin up new ones

Reference Counting (Rc)

  • Reference counting on single-threaded executors can be done using Rc (Việc đếm tham chiếu trên các trình thực thi đơn luồng có thể được thực hiện bằng cách sử dụng Rc)
  • Reference counting on multi-threaded executors can be done using Arc.

Streams

  • Streams are async iterators (streams chính là trình vòng lặp không đồng bộ)
  • They represent potentially infinite arrivals
  • They cannot be executed, but operations on them are futures

Classic Stream operations

  • iteration
  • merging
  • filtering

Async Iteration

while let Some(item) = stream.next().await {
    //...
}
 

hnidoaht-101

A 🦀 love open sources. Actually just a pet lover with passion to build the nice things.


By hn1Do@ht-0n3z3Roo11e, 2022-10-22