-
Notifications
You must be signed in to change notification settings - Fork 88
/
Copy path2-from_kafka_to_kafka.py
96 lines (84 loc) · 3.62 KB
/
2-from_kafka_to_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json
def from_kafka_to_kafka_demo():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
# register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
# query
st_env.from_path("source").insert_into("sink")
# execute
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092")) \
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("eventTime", DataTypes.STRING()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
.with_schema( # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("eventTime", DataTypes.STRING())) \
.in_append_mode() \
.create_temporary_table("source")
def register_rides_sink(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("TempResults")
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092")) \
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("rideTime", DataTypes.STRING())
]))) \
.with_schema( # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("rideTime", DataTypes.STRING())) \
.in_append_mode() \
.create_temporary_table("sink")
if __name__ == '__main__':
from_kafka_to_kafka_demo()