原文标题:Java 22 新增利器: 使用 Java Stream Gather 优雅地处理流中的状态
原文作者:阿里云开发者
冷月清谈:
怜星夜思:
2、Stream Gather在实现并发控制上有哪些创新的方法?
3、虚拟线程在Stream Gather中的应用前景如何?
原文内容
阿里妹导读
一、背景
二、什么 是 Stream ?
三、各种库有什么不一样
四、为什么需要 Stream Gather API?
问题空间
Java Stream API 目前主要缺乏丰富的 操作符,而如果像其他的库一样添加很多的操作符到 Stream API,又会有较大的维护负担,如果希望只添加有限的操作符,却可以解决绝大部分的问题。则新的操作符需要支持下面的特性:
-
支持多种类型的数据转换:1 :1, 如 map 操作符1 :N, 如 filter ,flatmapN :1, 如 bufferN:M, 如 intersperse
-
支持 “有限流” 和 “无限流”
-
支持 “有状态” 和 “无状态”操作
-
支持“提前终止”和 “全量消费”
-
支持 “单线程顺序处理”和 “多线程并行处理”
-
支持感知 “结束”,如实现一个 fold 操作符
Stream Collector & Gather
五、分析和对比
Stream Gather API 拆解
-
supplier:产生最初的 State
-
integrator + downstream:转换 和 传递值;
-
map操作符无状态, 1 :1 地产生元素
-
filter 操作符无状态, 1 : 0 ... 1 地产生元素
public static <T, R> Gatherer<T, ?, R> map(final Function<T, R> mapper) { return Gatherer.of((notused, element, downstream) -> downstream.push(mapper.apply(element))); }
public static <T> Gatherer<T, ?, T> filter(final Predicate<T> predicate) {
return Gatherer.of((notused, element, downstream) -> {
if (predicate.test(element)) {
return downstream.push(element);
}
return true;
});
}
和其他的类库对比
def statefulMap[S, T](
create: function.Creator[S],
f: function.Function2[S, Out, Pair[S, T]],
onComplete: function.Function[S, Optional[T]]): javadsl.Flow[In, T, Mat]
“操作符”地址:
public static <T> Source<Pair<T, Integer>, NotUsed> zipWithIndex(final Source<T, NotUsed> source) { return source.statefulMap( () -> 0, (state, element) -> Pair.create(state + 1, Pair.create(element, state)), notused -> Optional.empty() ); }
public static <T> Stream<Pair<T, Integer>> zipWithIndex(final Stream<T> stream) {
class State {
int index;
}
return stream.gather(Gatherer.ofSequential(
State::new,
(state, element, downstream) -> downstream.push(Pair.create(element, state.index++))
));
}
自带的 gathers 分析
-
mapConcurrent
public static <T, R> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency, //限制:最大并行度
final Function<? super T, ? extends R> mapper) //虚拟线程中执行的转换
整个实现非常的简单:
public static <T, R> Gatherer<T,?,R> mapConcurrent( final int maxConcurrency, final Function<? super T, ? extends R> mapper) { class State { final ArrayDeque<Future<R>> window = new ArrayDeque<>(Math.min(maxConcurrency, 16)); //使用信号量,不需要复杂的判断逻辑 final Semaphore windowLock = new Semaphore(maxConcurrency);final boolean integrate(T element,
Downstream<? super R> downstream) {
if (!downstream.isRejecting())
createTaskFor(element);
return flush(0, downstream);
}final void createTaskFor(T element) {
//阻塞等待permit,这里不是虚拟线程
windowLock.acquireUninterruptibly();var task = new FutureTask<R>(() -> {
try {
return mapper.apply(element);
} finally {
//处理完成后释放信号量permit
windowLock.release();
}
});var wasAddedToWindow = window.add(task);
//使用虚拟线程来执行具体的任务
Thread.startVirtualThread(task);
}final boolean flush(long atLeastN,
Downstream<? super R> downstream) {
//…省略很多代码,将结果值推送给下一个处理节点
downstream.push(current.get());
}
}
return Gatherer.ofSequential(
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
);
}
我们可以看到:利用 虚拟线程 来并发的执行 mapper, 并结合 信号量来实现 maxConcurrency的限制。整个实现非常简单,感兴趣的同学可以对比下 Reactor-core 中 flatmap和 Pekko-Stream 中 mapAsync的实现。
-
fold
public static <T, R> Gatherer<T, ?, R> fold(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> folder) {
class State {
R value = initial.get(); //初始状态,记录聚合结果值
State() {}
}
return Gatherer.ofSequential(
State::new,
Integrator.ofGreedy((state, element, downstream) -> {
state.value = folder.apply(state.value, element);
return true;
}),
//流处理结束,返回结果值给流的下一个处理节点
(state, downstream) -> downstream.push(state.value)
);
}
同样将状态保持在了 局部的 State类中,并且在结束时,调用了 finisher返回的方法,将最终的值推送给了流的下一个处理节点。因为是 fold方法不一定满足 结合律,所以上面使用的是 Gatherer.ofSequential, 来保证串行执行。同时,Stream Gather API 也支持多个 gather 之间组合,相当于其他库中的 fuse ,继而提高性能。
六、未来展望和小结
-
在底层技术上,我们具备深厚的Android和iOS底层技术积累,拥有丰富的编译器、链接器、解释器技术应用实践。
-
在研发模式上,我们负责原生研发模式DX演进,服务数千开发者、承载数百亿日PV,深耕系统原生渲染技术,致力于建立下一代终端研发模式。
-
在网络技术上,我们在终端网络、传输和超大规模网关有深厚技术积累,负责开源方案XQUIC/Tengine,承载亿级长连和千万级QPS;在国际IETF标准、顶会SIGCOMM均有建树。
-
在终端技术上,我们打造领先行业的移动技术产品,涵盖多端架构、性能体验、组件框架、用户增长等关键领域,致力于移动端系统及厂商特性前沿探索。
-
在后端技术上,我们负责移动基础设施,有百万级QPS API网关、消息/推送、Serverless平台、自适应流控等柔性高可用解决方案。打造覆盖移动App全生命周期工程技术平台。
-
在跨端技术上,我们负责Weex2.0和核心Web容器,研究领域涉及W3C标准、WebKit内核、脚本引擎和自绘渲染引擎,面向Web标准提供一流跨端能力。通过卡片级小部件和小游戏技术,丰富创意供给,提供差异化的购物体验。
-
在前端技术上,我们在前端框架、工程、低代码领域长期深耕,支撑大促营销ProCode、LowCode、NoCode跨端页面研发;配套前沿的页面托管;负责ICE、微前端等开源方案,致力于提供简单友好的研发体系。









