Differences between async & sync
- sync programming often has imperative behaviour (hành vi bắt buộc, sai khiến)
- async programming is about constructing a process at runtime and then executing it
- this process is called the “futures tree”
Futures
An async fn returns a Future
async fn
returns aFuture
typeFuture
is a trait that is implemented by types that can be asynchronously awaited- For e.g., this following async fn returns a
Future
type:
use tokio::{fs::File, io::AsyncReadExt};
async fn read_from_disk(path: &str) -> std::io::Result<String> {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
Then, desugaring the above async fn, please notice the fn signature is changed to:
use std::future::Future;
use tokio::{fs::File, io::AsyncReadExt};
// The return type of the function is now a generic type `F` that implements `Future`
fn read_from_disk<'a>(path: &'a str)
-> impl Future<Output = std::io::Result<String>> + 'a
{
// The body of the function is now a `async` block
// move keyword is used to move the variables into the closure
async move {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
}
What are Futures?
- A
Future
is a type that represents an asynchronous computation - Futures represent a datastructure that - at some point in the future - give us the value that we are waiting for.
The Future may be:
- delayed
- immediate
- infinite
Futures are operations
Futures are complete operations that can be awaited for. Examples:
read
: Read (up to) a number of bytesread_to_end
: Read a complete input streamconnect
: Connect a socket
Futures are poll-based
They can be checked if they are done
, and are usually mapped to readiness based APIs like epoll
.
.await
registers interest in completion
Futures need to be executed
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let read_from_disk_future = read_from_disk("Cargo.toml");
let result = async {
let task = tokio::task::spawn(read_from_disk_future);
task.await
}
.await;
println!("{:?}", result);
}
async fn read_from_disk(path: &str) -> std::io::Result<String> {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
Tasks
- A task connects a future to the executor
- The task is the concurrent unit!
- A task is similar to a thread, but is user-space scheduled
Futures all the way down: Combining Futures
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let read_from_disk_future = read_from_disk("Cargo.toml");
let timeout = Duration::from_millis(1000);
let timeout_read = tokio::time::timeout(timeout, read_from_disk_future);
let result = async {
let task = tokio::task::spawn(timeout_read);
task.await
}
.await;
println!("{:?}", result);
}
Ownership/Borrowing Memory in concurrent systems
- Ownership works just like expected - it flows in and out of tasks/futures
- Borrows work over
.await
points - This means: All owned memory in a Future must remain at the same place (Tất cả bộ nhớ được sở hữu trong 1 Future phải ở cùng một vị trí).
- Sharing between tasks is often done using
Rc/Arc
Categories of Executors
single-threaded
- Generally better latency, no synchronization requirements
- Highly susceptible to accidental blockades (rất dễ bị phong tỏa ngẫu nhiên)
- Harmed by accidental pre-emption (bị hại do vô tình sử dụng trước)
multi-threaded
- Generally better resource use, synchronization requirements
- Harmed by accidental pre-emption
deblocking (gỡ lỗi)
- Actively monitor for blocked execution threads and will spin up new ones
Reference Counting (Rc)
- Reference counting on single-threaded executors can be done using
Rc
(Việc đếm tham chiếu trên các trình thực thi đơn luồng có thể được thực hiện bằng cách sử dụng Rc) - Reference counting on multi-threaded executors can be done using
Arc
.
Streams
- Streams are async iterators (streams chính là trình vòng lặp không đồng bộ)
- They represent potentially infinite arrivals
- They cannot be executed, but operations on them are futures
Classic Stream operations
- iteration
- merging
- filtering
Async Iteration
while let Some(item) = stream.next().await {
//...
}