大数据志愿Flink整合ElasticSearch
发布时间:2025-10-03
//3、进行时依此类推的时长每条。对于基准型则对此起始的大数
config.put("bulk.flush.backoff.delay", "2");
//4、失利依此类推的数目
config.put("bulk.flush.backoff.retries", "3");
其他的一些内置:
bulk.flush.max.actions: 厂家加载时的仅次于加载条数 bulk.flush.max.size.mb: 厂家加载时的仅次于数据资料量 bulk.flush.interval.ms: 厂家加载的时长每条,内置后则可能会按照该时长每条严格制订,漠视后面的两个厂家加载内置
三、失利以外央执行器
加载ES的时候很多时候由于ES集群字段满了,或者路由挂丢出,时常可能会随之而来加载配置制订失利。考虑到这样的失利加载情节,EsSink为使用者包括了失利以外央执行器前提,始创Sink都可的时候,同时可以盛行一个失利以外央执行器,一旦显现加载失利的状况则可能会回调所盛行的以外央执行器运用于有误恢复。具体的专有名词为:
DataStream input = ...;
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// 将失利请求继续加入字段,后续进行时依此类推加载
indexer.add(action);
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// 去丢出自假定的执行逻辑
} else {
throw failure;
}
}
}));
如果仅仅只是想要做到失利依此类推,也可以前提运运用于官方所包括的预设的RetryRejectedExecutionFailureHandler,该以外央执行器可能会对EsRejectedExecutionException随之而来到失利加载做到依此类推执行。
四、其他肯定点
1. EsSinkcode块不用运运用于try-catch-Exception来猎捕
之前在运运用于EsSink的时候,为了防止某次加载失利随之而来程序以外断,对ElasticsearchSinkFunction的 process() 新方法运运用于try-catch-exception语句块进行时了猎捕,但基本上运行的时候发掘出程序冲刺着冲刺着还是被一个 EsRejectedException 持续性以外断丢出了。让人那时候明明对持续性进行时了猎捕,为什么这个持续性还是能够抛出来,仍然通过查看软件包发掘出,如果在初始化EsSink都可的时候从未盛行 ActionRequestFailureHandler 则可能会运运用于预设的 ActionRequestFailureHandler ,这个以外央执行器的软件包如下:
public class NoOpFailureHandler implements ActionRequestFailureHandler {
private static final long serialVersionUID = 737941343410827885L;
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
// 这里抛出的是一个throwable
throw failure;
}
可以看不到,在发生持续性的时候,预设的以外央执行器可能会将持续性包装成一个 Throw 都可抛出,这就是前提运运用于 try-Exception 不会猎捕到的原因。
解决新方法:
做到到自己的失利以外央执行器消化丢出持续性 运运用于 throw 来猎捕持续性该解决办法一定要要点肯定,负责可能会随之而来高分辨率任务终止丢出!
2. 失利依此类推前提贫乏于checkpoint
如果自已运运用于EsSink的失利依此类推前提,www.atguigu.com则必需通过 env.enableCheckpoint() 新方法来触发Flink任务对checkpoint的支持,如果从未触发checkpoint前提的话,则失利依此类推作法是不会生效的。这个是通过搜索 ElasticsearchSinkBase 类软件包的时候发掘出的,核心的code如下:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// no initialization needed
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkErrorAndRethrow();
//如果从未触发checkPoint前提,则该变量为false,也就随之而来下面的flush依此类推code不可能会制订到
if (flushOnCheckpoint) {
do {
//失利依此类推的时机是发生在程序在打checkpoint的时候
bulkProcessor.flush();
checkErrorAndRethrow();
} while (numPendingRequests.get() != 0);
}
}
3. 揭示
可以通过第二点贴出的软件包发掘出,虽然EsSink做到到了 CheckpointedFunction 应用程序,并且重写了checkPoint的相关新方法,但其并从未墨守成规的并用checkpoint假定的那样并用State前提运用于超载恢复。而是并用了checkpoint的于是便,定时制订的框架来做到到了自己的一套失利依此类推前提。
文章转贴意指大数据资料技术开发与Core
力荐朗读:
大数据资料共同开发之Flink sql 的基础专有名词
大数据资料志愿Flink基础知识互动
大数据资料志愿Flink应征一部
大数据资料志愿应征Flink八股文互动
。中年女人吃什么补品好威海看白癜风哪里比较好
宁波看白癜风去哪家医院比较好
长沙哪个男科医院最好
成都甲状腺专科医院哪里好
-
60岁妻子分家产,百万存款和一套房,两个儿媳说:谁要谁倒霉
小正和小斌是一家人两兄弟,祖母坚定不移了大半辈子,没人也有一点总成绩。四个人依然住在爷爷奶奶遗留的那套小房子从前,日后两个幼子都外出离家工作,就剩祖祖母俩个人在家。不过日后外祖父也因为一场车祸离
- 在国企,不说10种话,不做10种事,让你笑看领导社会的发展,逍遥度日
- 人在职场,“多干多错,不干不错”, 这样的点子必须彻底抛弃!
- 男子被分手后翻墙进入女方家中,找对方儿子聊天遭拒后持刀伤人
- 父母”重男轻女“,女儿不得不成扶弟魔,母亲怒斥:弟弟才是亲生的
- 职场中年人就范吧,不要再任性的3条因素
- 裁员潮下,为什么被裁的是我?痛悟之后,我明白了四点
- 离职以后,该不该请公司的助手吃饭?请或不请,你可以这样判断
- 35岁以后,你才会明白的职场道理:真的别如此一来为工作而拼命了
- 我和女同有事出差,一时糊涂发生了不该做的有事,该不该给老婆坦白?
- 陌生人带男子回家过夜,半夜报警,陌生人:自愿发生关系偷手机不行