flink解析canal-json数据

时间:2021-06-25 16:38:01   收藏:0   阅读:0

引入依赖

    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>

  

val env = StreamExecutionEnvironment.getExecutionEnvironment
    println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行")
    val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args))
    println(begin_date)

    //添加kakka数据源

    val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps())
      .setStartFromEarliest())  //设置消费kafka位置
      .map(JSON.parseObject(_))
      .filter(_.get("table")=="epidemic_report")
      .filter(_.get("type").toString.matches("(INSERT|UPDATE)"))
      .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass))
//      .filter(_.getSet_id=="1")
      .filter(_.getCreat_time > begin_date)

 

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!