当前位置: 首页 > news >正文

微信运营有前途吗seo搜索引擎优化的内容

微信运营有前途吗,seo搜索引擎优化的内容,公共服务标准化试点,邳州做网站pzwode问题描述 不同情况下需要找对应的解决方法,这里介绍的解决方法不能拓展到别的场景。 场景描述: flink job 的开发过程中遇到这样的需求,需要先 map 处理,然后把返回的 DataStream 作为输入,流入别的 map 中。这里我们遇…

问题描述

不同情况下需要找对应的解决方法,这里介绍的解决方法不能拓展到别的场景。

场景描述: flink job 的开发过程中遇到这样的需求,需要先 map 处理,然后把返回的 DataStream 作为输入,流入别的 map 中。这里我们遇到的场景是从原来的 map 流到 AsyncDataStream 中。
大概的 java 代码为:

        SingleOutputStreamOperator<xxxxx> task = source.rebalance().map(new XXXXMapFunction()).uid("xxxxxx").name("xxxx");DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream.unorderedWait(task, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS).name("yyyyyy").uid("yyyyyy");.......

问题描述: 开发完成本地运行flink入口 main 方法的时候,错误提示如下:

Caused by: java.lang.RuntimeException: Assigned key must not be null!at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:298)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:371)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:352)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)at java.lang.Thread.run(Thread.java:750)

解决方法

我们避免数据直接从第一个 map 过程后直接流向第二个 AsynMap,中间添加一个处理过程,尽管这个处理过程我们啥也不干。

        SingleOutputStreamOperator<xxxxx> task = source.rebalance().map(new XXXXMapFunction()).uid("xxxxxx").name("xxxx");// 这里添加一个处理过程,task -> process ,然后process到下一个 mapSingleOutputStreamOperator<KontrastAlgoTaskStated> process = task.process(new DefaultProcessFunction<>());DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream.unorderedWait(process, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS).name("yyyyyy").uid("yyyyyy");

其中的 DefaultProcessFunction 是我自己编写的,就是什么也不做。

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** 默认的 collect,用于处理 map 后不做任何处理直接到下一个结点* @author smileyan* @param <IN> 对应的实体类*/
public final class DefaultProcessFunction<IN> extends ProcessFunction<IN, IN> {@Overridepublic void processElement(IN value, ProcessFunction<IN, IN>.Context ctx, Collector<IN> out) throws Exception {out.collect(value);}
}

总结

遇到问题后查了一下,都没有找到我这种情况的博客。后来折腾了一会儿发现如上方法可以解决问题,特此记录,希望可能帮到遇到相同问题的小伙伴 ~ 感谢阅览 ~

Smileyan
2023.07.18 10:12

http://www.rdtb.cn/news/22281.html

相关文章:

  • 可以找人帮忙做设计的网站百度惠生活商家怎么入驻
  • 用wordpress建站会不会显得水平差2023年4 5月份疫情结束吗
  • 做网站安全联盟解武汉网站seo
  • wordpress练习题windows优化大师值得买吗
  • 企业自建网站营销西安网站公司推广
  • 网站建设 中企动力洛阳分公司seo精华网站
  • 管理好员工的方法网站功能优化
  • 温州网站建设模板总部南京seo关键词排名
  • 苏州沧浪做网站哪家好网络营销论文
  • 这么做钓鱼网站活动策划公司
  • 温州微网站开发站长工具查询入口
  • 苏州网站设计kgwl线在科技成都网站推广公司
  • 万金娱乐网站开发疫情最新消息今天封城了
  • 网站外链建设实例互联网营销专业
  • 购买网站域名打字赚钱平台 学生一单一结
  • wordpress缓存目录百度seo排名培训 优化
  • 怎么知道一个网站是哪家公司做的国际新闻视频
  • php快速建网站百度网页版怎么切换
  • 商业活动的网站建设互联网舆情
  • 龙华区是深圳最差的区广州seo网站排名
  • 延吉网站建设郑州seo询搜点网络效果佳
  • 交易平台网站建设策划书天津短视频seo
  • 免费发外链的网站百度广告推广费用一年多少钱
  • 长春做网站选长春万网免费发布信息网
  • 沭阳苏奥产业园做网站营销型高端网站建设
  • 自制网站导航图怎么做哪家公司建设网站好
  • 诸暨哪些公司可以制作网站最新推广注册app拿佣金
  • 赣州网站制作公司百度营销推广
  • 优化wordpress访问速度seo网络科技有限公司
  • 自己做的网站怎么放上网软文范例100字以内