Differences between async & sync
- sync programming often has imperative behaviour (hành vi bắt buộc, sai khiến)
- async programming is about constructing a process at runtime and then executing it
- this process is called the “futures tree”
Futures
An async fn returns a Future
async fnreturns aFuturetypeFutureis a trait that is implemented by types that can be asynchronously awaited- For e.g., this following async fn returns a
Futuretype:
use tokio::{fs::File, io::AsyncReadExt};
async fn read_from_disk(path: &str) -> std::io::Result<String> {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
Then, desugaring the above async fn, please notice the fn signature is changed to:
use std::future::Future;
use tokio::{fs::File, io::AsyncReadExt};
// The return type of the function is now a generic type `F` that implements `Future`
fn read_from_disk<'a>(path: &'a str)
-> impl Future<Output = std::io::Result<String>> + 'a
{
// The body of the function is now a `async` block
// move keyword is used to move the variables into the closure
async move {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
}
What are Futures?
- A
Futureis a type that represents an asynchronous computation - Futures represent a datastructure that - at some point in the future - give us the value that we are waiting for.
The Future may be:
- delayed
- immediate
- infinite
Futures are operations
Futures are complete operations that can be awaited for. Examples:
read: Read (up to) a number of bytesread_to_end: Read a complete input streamconnect: Connect a socket
Futures are poll-based
They can be checked if they are done, and are usually mapped to readiness based APIs like epoll.
.await registers interest in completion
Futures need to be executed
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let read_from_disk_future = read_from_disk("Cargo.toml");
let result = async {
let task = tokio::task::spawn(read_from_disk_future);
task.await
}
.await;
println!("{:?}", result);
}
async fn read_from_disk(path: &str) -> std::io::Result<String> {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
Tasks
- A task connects a future to the executor
- The task is the concurrent unit!
- A task is similar to a thread, but is user-space scheduled
Futures all the way down: Combining Futures
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let read_from_disk_future = read_from_disk("Cargo.toml");
let timeout = Duration::from_millis(1000);
let timeout_read = tokio::time::timeout(timeout, read_from_disk_future);
let result = async {
let task = tokio::task::spawn(timeout_read);
task.await
}
.await;
println!("{:?}", result);
}
Ownership/Borrowing Memory in concurrent systems
- Ownership works just like expected - it flows in and out of tasks/futures
- Borrows work over
.awaitpoints - This means: All owned memory in a Future must remain at the same place (Tất cả bộ nhớ được sở hữu trong 1 Future phải ở cùng một vị trí).
- Sharing between tasks is often done using
Rc/Arc
Categories of Executors
single-threaded
- Generally better latency, no synchronization requirements
- Highly susceptible to accidental blockades (rất dễ bị phong tỏa ngẫu nhiên)
- Harmed by accidental pre-emption (bị hại do vô tình sử dụng trước)
multi-threaded
- Generally better resource use, synchronization requirements
- Harmed by accidental pre-emption
deblocking (gỡ lỗi)
- Actively monitor for blocked execution threads and will spin up new ones
Reference Counting (Rc)
- Reference counting on single-threaded executors can be done using
Rc(Việc đếm tham chiếu trên các trình thực thi đơn luồng có thể được thực hiện bằng cách sử dụng Rc) - Reference counting on multi-threaded executors can be done using
Arc.
Streams
- Streams are async iterators (streams chính là trình vòng lặp không đồng bộ)
- They represent potentially infinite arrivals
- They cannot be executed, but operations on them are futures
Classic Stream operations
- iteration
- merging
- filtering
Async Iteration
while let Some(item) = stream.next().await {
//...
}