see the previous entry here discussing building a framework for writing custom operators in rust
using async rust to power a touchdesigner top
In TouchDesigner, TOPs work with texture data. Because it's generally better
to miss a frame rather than stall consumers of a texture, it's common for
TOP operators to use some kind of concurrency to offload processing in a
way that doesn't block execute
on waiting for more data.
What this typically might look like is something like:
- ~ Obtain a lock to update shared parameter state.
- ~ Flag an atomic bool to indicate to the producer thread more frames are needed.
- ~ Check an atomic bool to see if the producer should exit.
- ~ Do work on the producer thread and push to some kind of synchronized queue when done.
- ~ Attempt to read new data from queue on the
excecute
thread and upload our texture if there is one.
In C++, this easily can mean using several mutex and other concurrency primitives, which is error prone and tedious. While Rust provides better concurrent programming idioms using things like channels, this still can require passing around several different channels in order to manage the lifecycle of a producer: a channel to trigger more work, a channel to update settings, a channel to check for exit, a chanell to push new data, etc.
Luckily, using async Rust, there's a better way!
the async story in rust so far
Unlike languages like JavaScript or C#, Rust doesn't ship with a runtime for doing async work. Instead, users must choose an "executor" library that async work will run on. By and large, the Rust community has settled on Tokio. However, while there are many wonderful crates that build on top of this ecosystem, bridging the gap into synchronous code is not always pretty.
Typically, Tokio would suggest you block a thread on waiting on a future:
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
// blocking call to async fn
let res = runtime::block_on(async {
some_async_work().await
})
// do something with res...
However, since our goal is to not block the execute
Thread, this poses a problem.
How can we make use of the Rust async ecosystem, but still call our async work in
a non-blocking way from a synchronous context?
poll, poll, poll
Our example operater is going to call a remote HTTP resource which returns a pixel buffer. Because this external resource is slow (think ML model, etc.), we want to saturate it with a number of requests to improve GPU utilization. In other words, we want to maximize throughput at the expense of a bit of latency. We will get more frames faster, but they may be delayed from the parameters used to generate them. For our use case, this works.
Using tokio
, we'll submit up to MAX_TASKS
tasks and store the resulting join handles
on our operator. We'll also store the last processed request in order to prevent
unecessary requests for frames where our operator parameters don't change.
pub type Task = JoinHandle<anyhow::Result<Vec<u8>>>;
pub struct RemoteHttpTop {
params: RemoteHttpTopParams,
execute_count: u32,
context: TopContext,
tasks: VecDeque<Task>,
last_req: Option<ImageReq>,
}
impl TopNew for RemoteHttpTop {
fn new(_info: NodeInfo, context: TopContext) -> Self {
Self {
params: Default::default(),
execute_count: 0,
context,
tasks: VecDeque::with_capacity(MAX_TASKS),
last_req: None,
}
}
}
Our execute
method is simple. Check if we got an image, if so, upload the image to the TOP.
fn execute(&mut self, mut output: TopOutput, input: &OperatorInputs<TopInput>) {
self.execute_count += 1;
if let Some(mut image) = self.get_image() {
// upload image here...
}
}
But how can we implement get_image
without blocking the thread? The implementation might at first look a
bit peculiar:
fn get_image(&mut self) -> Option<Vec<u8>> {
RUNTIME.block_on(self)
}
What does it mean to block_on
self? Importantly, this means that we have implemented the trait
std::future::Future
for our operator itself. Our goal here is to implement our operator as a future
that never blocks. In other words, we "run" our operator as a Tokio task, but only so that we can
check the status of our pending tasks, returning them if ready and simply moving on if not:
impl Future for RemoteHttpTop {
type Output = Option<Vec<u8>>;
// Poll internal taskss
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
// Get our current param state as request
let req = self.params_as_req();
// If our params are different and we have space for a new task
// create a new task request
if self.last_req.as_ref() != Some(&req) && self.tasks.len() < MAX_TASKS {
// Self::request_image is an async function...
self.tasks.push_back(tokio::spawn(Self::request_image(req.clone())));
self.last_req = Some(req);
};
// While we have tasks, poll them
// If they're ready, return the image
// If they're failed, throw them away
// If they're not ready, reinsert them at the beginning
while let Some(mut task) = self.tasks.pop_front() {
// Pin'n'poll
match Pin::new(&mut task).poll(cx) {
Poll::Ready(Ok(Ok(image))) => {
return Poll::Ready(Some(image));
}
Poll::Ready(Ok(Err(_))) | Poll::Ready(Err(_)) => {
continue;
}
Poll::Pending => {
self.tasks.insert(0, task);
return Poll::Ready(None);
}
}
}
return Poll::Ready(None);
}
}
By only ever returning Poll::Ready
, we ensure that our blocking method called
in execute
never actually blocks for more time than it takes to check if a new
image is ready.
Importantly, we didn't have to use any concurrency primitives here. And, because our poll
method gets access to Pin<&mut Self>
, we can do arbitrary mutation to our operator
in the futures body while being able to kick off new async work.
disadvantages
Implementing this does require some knowledge of how futures work in Rust. It's also not ideal if we also want to do CPU bound work in addition to waiting on some IO resource.
Using Tokio also means giving up some fine grained control over how we schedule our work. When we spawn an async task, Tokio is promising us that it runs... somewhere, on a thread pool scheduled and managed internally. Tokio is designed for high throughput networked services, and so this can mean that tasks for a more important operator don't always get priority over other tasks that might be okay de-prioritize.
conclusion
Using Rust is fun, and in simple cases, using async techniques can produce significantly simpler code that doesn't require fiddly locking or other concurrency primitives. The async Rust ecosystem is large, which can provide a lot of library support for common tasks, particularly around things like networked resources.