首页 昆明信息港 下载客户端 关于彩龙 论坛须知

pyflink读取kafka数据入mysql

首先还是要初始化pyflink的环境,创建Table Environment

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, TableFunction

from pyflink.table.udf import udf, udtf

# 创建Table Environment, 并选择使用的Planner

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

 

接着写关于kafka数据库的ddl

kafka_source_ddl = """ CREATE TABLE my_source_kafka (

payload STRING  #要取的Key的名字

) WITH (

'connector' = 'kafka', #数据库为kafka

'topic' = 'xxxx', #kafka当中的topic的名字,相当于一张表

'properties.group.id' = 'testGroup', #kafka的消费组,如果用已有的名字,就是使用了它的偏移量,如果是新的就是从头开始。

'properties.bootstrap.servers' = 'x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092', #kafka集群的ip和端口

'scan.startup.mode' = 'earliest-offset',

'format' = 'json', #读取的格式是json

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true' #出现错误怎样处理

) """

 

接下来是mysql的ddl配置

mysql_sink_ddl ="""CREATE TABLE MySinkTable (

log1 STRING, #要入的mysql的2个字段的名字

log2 STRING

) WITH (

'connector' = 'jdbc',#jdbc的引擎

'url' = 'jdbc:mysql://x.x.x.:4000/库名',#mysql的地址和端口,和库的名字

'table-name' = 'ods_countly_test', #表的名字

'driver' = 'com.mysql.cj.jdbc.Driver',

'username' = 'xx',#用户名

'password' = 'xx'#密码

) """

 

接来是一个大坑,一定要注意

@udtf(input_types = DataTypes.STRING(),result_types=[DataTypes.STRING(),DataTypes.STRING()])

def preprocess(payload):

      tes = json.loads(payload)

      x = tes.get('after')

      tes_js = json.loads(x)

      d = tes_js.get('d')

      device_id = d.get('id')

      device_type = d.get('p') 

      return device_id, device_type

上面这个自定义的udf,如果你要返回多个数据,那么就需要使用udtf这个装饰器,并且后面的result_types也需要按照返回数据的格式依次对应的格式。

具体udf的特殊用法可以参考官方文档 

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/table-api-users-guide/udfs/python_udfs.html

接来注册udf函数和ddl

t_env.create_temporary_system_function("preprocess", preprocess)

t_env.sql_update(kafka_source_ddl)

t_env.sql_update(mysql_sink_ddl)

然后是写sql

t_env.sql_update("INSERT INTO MySinkTable SELECT os_version, device_type FROM my_source_kafka, LATERAL TABLE(preprocess(payload)) as MySinkTable(os_version, device_type)")

记住这里的写法,一定要模仿正确,要不然会执行不成功。

# 执行作业

t_env.execute("kafka_sql")

 

 

这样就完成了从kafka里面读出json数据,然后经过自定义的udf函数解析,完成数据进去mysql的整个流程,上面强调的终点就是我踩的坑,希望大家注意。

网友评论

0条评论

发表

网友评论

0条评论

发表

最新评论

推荐文章

彩龙

Copyright © 2008-2021 彩龙社区 版权所有 All Rights Reserved.

免责声明: 本网不承担任何由内容提供商提供的信息所引起的争议和法律责任。

经营许可证编号:滇B2-20090009-7

下载我家昆明APP 下载彩龙社区APP