首页 昆明信息港 下载客户端 关于彩龙 论坛须知

在pyflink中使用流与table的转换

1.第一步还是创建环境,可以看我前面的文章,详细讲解了如何创建pyflink环境

2。注册ddl,今天使用2个kafka的ddl

ddl1 =  CREATE TABLE content_security_stream_darklink_result (
                    data STRING
                ) WITH (
                    'connector' = 'kafka','topic' = 'content_security_stream_darklink_result','properties.bootstrap.servers' = '192.168.20.100:9092, 192.168.20.101:9092, 192.168.20.102:9092','format' = 'json'
                )


ddl2 =  CREATE TABLE content_security_batch_pipe_darklink (
                    data STRING
                ) WITH (
                    'connector' = 'kafka','property-version' = 'universal','properties.group.id' = 'badlink_batch_darklink1','topic' = 'content_security_batch_pipe','scan.startup.mode' = 'group-offsets','properties.bootstrap.servers' = '192.168.20.100:9092, 192.168.20.101:9092, 192.168.20.102:9092','format' = 'json'
                )

3注册ddl和各种函数,这里不细讲了,前面的文章讲很详细


4.在pyflink中进行流与table的转换

(1)首先查一个数据表出来

data = self.st_env.from_path("content_security_batch_pipe_darklink").select("data")

这里查询出来的是一个table的类型,但是我们很多时候是需要流式数据的,并且这个函数还有个巨大的好处,就是可以有内置函数去更新缓存,每次执行任务的时候先执行缓存的函数,再执行后续的逻辑

(2)将table的数据转换为流失数据,使用的to_append_stream方法,其中参数必须由Types.TUPLE的格式进去,然后是一个列表,其中代表了多个参数,取的时候也可以根据列表的下标方法取。

data_stream = self.st_env.to_append_stream(data, type_info=Types.TUPLE([Types.STRING()]))

(3)将数据转换为流式数据以后,需要进行逻辑上的操作。

final_stream = data_stream.flat_map(FooFlatMap(), result_type=Types.ROW([Types.STRING()]))

其中的FooFlatMap()函数就是我们主要逻辑的函数,他长这样:

class FooFlatMap(FlatMapFunction):

def open(self, runtime_context: RuntimeContext):

        pass

def flat_map(self, value):

        yield Row(str(value))

这个函数继承了pyflink的系统内置函数FlatMapFunction,其中的open函数,就是这个程序每次执行的时候的初始化函数,每次程序启动,都会先执行open函数,那么就可以在open函数当中使用许多初始化的参数,也可以再后续代码中重复执行open函数来更新自己的数值,就可以保证数据依赖随时可以更新。

然后我们flat_map函数就是我们的业务逻辑所在的函数,在这里编写自己的业务逻辑,其中返回一定要是用Row这个方法包起来,才可以在上边result_type当中使用Types.ROW([Types.STRING()]))取出返回的数据。

(4)接下来就是将流式数据再转换为table数据

final_table = self.st_env.from_data_stream(final_stream)

很灵活的使用流式数据和table数据,在插入数据的时候就不受限制,做出很多变化来适应业务需求。

(5)将数据插入表中

final_table.insert_into("content_security_stream_darklink_result ")

(6)执行任务

self.st_env.execute("darklink_check")

以上的全部流程,就是执行了一个从kafka当中读出table数据,然后将table数据转换为流式数据,经过流式的业务逻辑处理,再转换为table数据插入kafka中,其中各种操作灵活多变,可以选择插入数据的数据格式,并且能够初始化数据,实现数据依赖在代码中实时更新,大大增加了pyflink的三个的实用性,个人认为非常的实用。

网友评论

0条评论

发表

网友评论

0条评论

发表

最新评论

推荐文章

彩龙

Copyright © 2008-2021 彩龙社区 版权所有 All Rights Reserved.

免责声明: 本网不承担任何由内容提供商提供的信息所引起的争议和法律责任。

经营许可证编号:滇B2-20090009-7

下载我家昆明APP 下载彩龙社区APP