多线程数据采集

/ 2025-11-05

//全部采集完成的债务人
Set debtorsCompleted = ConcurrentHashMap.newKeySet();
//全部未开始采集的债务人
Set debtorsUncompleted = ConcurrentHashMap.newKeySet();
//债务人正在采集中的sku
Set existKeySet = getCompletingSkus(debtorBatchList);




List<CompletableFuture> futures = new ArrayList<>();
for (DebtorBatch debtorBatch : debtorBatchList) {
CompletableFuture future = CompletableFuture.runAsync(() -> {
//债务人未采集完成的sku
List unCompletedSkus = getUncompletedSkus(debtorBatch.getDebtorCreditCode(), skus);
if (CollectionUtils.isEmpty(unCompletedSkus)) {
//债务人sku全部存在于redis
debtorsCompleted.add(debtorBatch.getDebtorCreditCode());
} else {
//排除已在采集中的
unCompletedSkus.removeIf(
sku -> existKeySet.contains(debtorBatch.getDebtorCreditCode() + “_” + sku));
//sku都在采集中
if (unCompletedSkus.isEmpty()){
return;
}
if (unCompletedSkus.size() == skus.size()) {
//债务人sku数据全部不存在于redis,以及没有正在采集中的
debtorsUncompleted.add(debtorBatch.getDebtorCreditCode());
} else {
//债务人sku数据部分存在于redis或者在采集中
String skuStr = String.join(“,”, unCompletedSkus);
handleServiceCall.queryData(debtorBatch.getBatchCode(),debtorBatch.getDebtorCreditCode(),skuStr);
}
}
}, taskExecutor)
.exceptionally(ex -> {
log.error(“异步处理失败, batchCode={}, debtor={}”,
debtorBatch.getBatchCode(),
debtorBatch.getDebtorCreditCode(),
ex);
return null;
});
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info(“asyncDataCollec 所有任务执行完成!”);





































转载请注明作者和出处,并添加本页链接。
原文链接: //pongpongkai.top/147

皖ICP备2025092356号-1 | 沪公网安备31011202021249号