add stream parallelism extension combinators

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-21 20:43:38 +00:00
parent 19f6d9d0e1
commit 8ab825b12c
3 changed files with 76 additions and 3 deletions

View file

@ -7,6 +7,7 @@ mod iter_stream;
mod ready;
mod tools;
mod try_broadband;
mod try_parallel;
mod try_ready;
mod try_tools;
mod try_wideband;
@ -24,6 +25,7 @@ pub use iter_stream::IterStream;
pub use ready::ReadyExt;
pub use tools::Tools;
pub use try_broadband::TryBroadbandExt;
pub use try_parallel::TryParallelExt;
pub use try_ready::TryReadyExt;
pub use try_tools::TryTools;
pub use try_wideband::TryWidebandExt;

View file

@ -18,7 +18,7 @@ where
) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
where
N: Into<Option<usize>>,
F: Fn(Self::Ok) -> Fut + Send + Sync,
F: Fn(Self::Ok) -> Fut + Send,
Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send;
fn broad_and_then<U, F, Fut>(
@ -26,7 +26,7 @@ where
f: F,
) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
where
F: Fn(Self::Ok) -> Fut + Send + Sync,
F: Fn(Self::Ok) -> Fut + Send,
Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
{
self.broadn_and_then(None, f)
@ -44,7 +44,7 @@ where
) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
where
N: Into<Option<usize>>,
F: Fn(Self::Ok) -> Fut + Send + Sync,
F: Fn(Self::Ok) -> Fut + Send,
Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
{
self.map_ok(f)

View file

@ -0,0 +1,71 @@
//! Parallelism stream combinator extensions to futures::Stream
use futures::{stream::TryStream, TryFutureExt};
use tokio::{runtime, task::JoinError};
use super::TryBroadbandExt;
use crate::{utils::sys::available_parallelism, Error, Result};
/// Parallelism extensions to augment futures::StreamExt. These combinators are
/// for computation-oriented workloads, unlike -band combinators for I/O
/// workloads; these default to the available compute parallelism for the
/// system. Threads are currently drawn from the tokio-spawn pool. Results are
/// unordered.
pub trait TryParallelExt<T, E>
where
Self: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
E: From<JoinError> + From<Error> + Send + 'static,
T: Send + 'static,
{
fn paralleln_and_then<U, F, N, H>(
self,
h: H,
n: N,
f: F,
) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
where
N: Into<Option<usize>>,
H: Into<Option<runtime::Handle>>,
F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
U: Send + 'static;
fn parallel_and_then<U, F, H>(
self,
h: H,
f: F,
) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
where
H: Into<Option<runtime::Handle>>,
F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
U: Send + 'static,
{
self.paralleln_and_then(h, None, f)
}
}
impl<T, E, S> TryParallelExt<T, E> for S
where
S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
E: From<JoinError> + From<Error> + Send + 'static,
T: Send + 'static,
{
fn paralleln_and_then<U, F, N, H>(
self,
h: H,
n: N,
f: F,
) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
where
N: Into<Option<usize>>,
H: Into<Option<runtime::Handle>>,
F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
U: Send + 'static,
{
let n = n.into().unwrap_or_else(available_parallelism);
let h = h.into().unwrap_or_else(runtime::Handle::current);
self.broadn_and_then(n, move |val| {
let (h, f) = (h.clone(), f.clone());
async move { h.spawn_blocking(move || f(val)).map_err(E::from).await? }
})
}
}