首页 昆明信息港 下载客户端 关于彩龙 论坛须知

pyflink利用生成器yield返回数据

        我的应用场景是这样的:需要读取一条json格式的信息,这天json信息当中,有可能出现某一个部分的是一个列表,然后需要将列表中信息提取出来,和其他的信息拼接,这样就将一条信息拆分成了多条消息,我在pyflink的使用过程中就遇到不会拆分的问题,最终通过研究官方文档,利用了生成器解决了这个问题。

 

首先还是要初始化pyflink的环境,创建Table Environment

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, TableFunction

from pyflink.table.udf import udf, udtf

# 创建Table Environment, 并选择使用的Planner

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

 

接着写关于kafka数据库的ddl

kafka_source_ddl = """ CREATE TABLE my_source_kafka (

payload STRING  #要取的Key的名字

) WITH (

'connector' = 'kafka', #数据库为kafka

'topic' = 'xxxx', #kafka当中的topic的名字,相当于一张表

'properties.group.id' = 'testGroup', #kafka的消费组,如果用已有的名字,就是使用了它的偏移量,如果是新的就是从头开始。

'properties.bootstrap.servers' = 'x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092', #kafka集群的ip和端口

'scan.startup.mode' = 'earliest-offset',

'format' = 'json', #读取的格式是json

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true' #出现错误怎样处理

) """

 

接下来是mysql的ddl配置

mysql_sink_ddl ="""CREATE TABLE MySinkTable (

log1 STRING, #要入的mysql的2个字段的名字

log2 STRING

) WITH (

'connector' = 'jdbc',#jdbc的引擎

'url' = 'jdbc:mysql://x.x.x.:4000/库名',#mysql的地址和端口,和库的名字

'table-name' = 'ods_countly_test', #表的名字

'driver' = 'com.mysql.cj.jdbc.Driver',

'username' = 'xx',#用户名

'password' = 'xx'#密码

) """

 

接来是注册一个udtf的自定义函数

@udtf(input_types = DataTypes.STRING(),result_types=[DataTypes.STRING(),DataTypes.STRING()])

def preprocess(payload):

      tes = json.loads(payload)

      x = tes.get('after')

      tes_js = json.loads(x)

      d = tes_js.get('d')

      device_id = d.get('id')

      device_type = d.get('p') 

      if device_type_list :  #这里假设这个字段是列表

          for device_type in device_type_list :

              yield device_id, device_type #这里如果采用的是return的话,函数直接就结束了,所以只能使用yield来返回数值,然后在后边的sql里面使用自带的函数就可以将一条数据分解为多个,并且返回给pyflink 进入数据库当中。

      else:

          return device_id, device_type

接来注册udf函数和ddl

t_env.create_temporary_system_function("preprocess", preprocess)

t_env.sql_update(kafka_source_ddl)

t_env.sql_update(mysql_sink_ddl)

然后写SQL语句就好啦

t_env.sql_update("INSERT INTO MySinkTable SELECT os_version, device_type FROM my_source_kafka, LATERAL TABLE(preprocess(payload)) as MySinkTable(os_version, device_type)")

执行

t_env.execute("kafka_sql")

网友评论

0条评论

发表

网友评论

0条评论

发表

最新评论

推荐文章

彩龙

Copyright © 2008-2021 彩龙社区 版权所有 All Rights Reserved.

免责声明: 本网不承担任何由内容提供商提供的信息所引起的争议和法律责任。

经营许可证编号:滇B2-20090009-7

下载我家昆明APP 下载彩龙社区APP