Skip to content

Commit 683521f

Browse files
committed
finish tps test
1 parent 5ce05b8 commit 683521f

9 files changed

+510
-4
lines changed

doc/shell/start_job.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#!/bin/bash
2-
3-
export FLINK_HOME=/opt/flink-1.14.3
2+
basedir=`cd $(dirname $0); pwd -P`
3+
echo "$basedir"
4+
cd $basedir
5+
export FLINK_HOME=/opt/flink-1.15.1
46
export PATH=$PATH:$FLINK_HOME/bin
57
export HADOOP_CLASSPATH=`hadoop classpath`
68

doc/shell/start_pre_job.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#!/bin/bash
22

3-
export FLINK_HOME=/opt/flink-1.14.3
3+
basedir=`cd $(dirname $0); pwd -P`
4+
cd $basedir
5+
export FLINK_HOME=/opt/flink-1.15.1
46
export PATH=$PATH:$FLINK_HOME/bin
57
export HADOOP_CLASSPATH=`hadoop classpath`
68

doc/shell/start_session.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#!/bin/bash
22

3-
export FLINK_HOME=/opt/flink-1.14.3
3+
basedir=`cd $(dirname $0); pwd -P`
4+
cd $basedir
5+
export FLINK_HOME=/opt/flink-1.15.1
46
export PATH=$PATH:$FLINK_HOME/bin
57
export HADOOP_CLASSPATH=`hadoop classpath`
68

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
-- Lookup Source: Sync Mode
2+
-- kafka source
3+
CREATE TABLE user_log
4+
(
5+
user_id STRING,
6+
item_id STRING,
7+
category_id STRING,
8+
behavior STRING,
9+
page STRING,
10+
`position` STRING,
11+
sort STRING,
12+
last_page STRING,
13+
next_page STRING,
14+
ts TIMESTAMP(3),
15+
process_time as proctime(),
16+
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
17+
) WITH (
18+
'connector' = 'kafka'
19+
,'topic' = 'user_log'
20+
,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
21+
,'properties.group.id' = 'user_log'
22+
,'scan.startup.mode' = 'latest-offset'
23+
,'format' = 'json'
24+
);
25+
26+
drop table if exists hbase_behavior_conf;
27+
CREATE
28+
TEMPORARY TABLE hbase_behavior_conf (
29+
user_id STRING,
30+
f ROW(sex STRING,
31+
age INTEGER,
32+
degree STRING,
33+
address STRING,
34+
work_address STRING,
35+
income_range STRING,
36+
default_shipping_address STRING,
37+
register_date TIMESTAMP(3),
38+
udpate_date TIMESTAMP(3))
39+
) WITH (
40+
'connector' = 'hbase-2.2'
41+
,'zookeeper.quorum' = 'dcmp10:2181,dcmp11:2181,dcmp12:2181'
42+
,'zookeeper.znode.parent' = '/hbase'
43+
,'table-name' = 'user_info'
44+
,'lookup.cache.max-rows' = '100000'
45+
,'lookup.cache.ttl' = '10 minute' -- ttl time 超过这么长时间无数据才行
46+
,'lookup.async' = 'true'
47+
);
48+
49+
---sinkTable
50+
CREATE TABLE user_log_sink
51+
(
52+
user_id STRING,
53+
item_id STRING,
54+
category_id STRING,
55+
behavior STRING,
56+
page STRING,
57+
`position` STRING,
58+
sort STRING,
59+
last_page STRING,
60+
next_page STRING,
61+
ts TIMESTAMP(3),
62+
sex STRING,
63+
age INTEGER,
64+
degree STRING,
65+
address STRING,
66+
work_address STRING,
67+
income_range STRING,
68+
default_shipping_address STRING,
69+
register_date TIMESTAMP(3),
70+
udpate_date TIMESTAMP(3)
71+
-- ,primary key (user_id) not enforced
72+
) WITH (
73+
'connector' = 'kafka'
74+
,'topic' = 'user_log_sink'
75+
,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
76+
,'properties.group.id' = 'user_log'
77+
,'scan.startup.mode' = 'group-offsets'
78+
,'format' = 'json'
79+
);
80+
81+
INSERT INTO user_log_sink
82+
SELECT a.user_id
83+
,a.item_id
84+
,a.category_id
85+
,a.behavior
86+
,a.page
87+
,a.`position`
88+
,a.sort
89+
,a.last_page
90+
,a.next_page
91+
,a.ts
92+
,b.sex
93+
,b.age
94+
,b.degree
95+
,b.address
96+
,b.work_address
97+
,b.income_range
98+
,b.default_shipping_address
99+
,b.register_date
100+
,b.udpate_date
101+
FROM user_log a
102+
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS b
103+
ON a.user_id = b.user_id
104+
where a.behavior is not null;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
-- Lookup Source: Sync Mode
2+
-- kafka source
3+
CREATE TABLE user_log
4+
(
5+
user_id STRING,
6+
item_id STRING,
7+
category_id STRING,
8+
behavior STRING,
9+
page STRING,
10+
`position` STRING,
11+
sort STRING,
12+
last_page STRING,
13+
next_page STRING,
14+
ts TIMESTAMP(3),
15+
process_time as proctime(),
16+
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
17+
) WITH (
18+
'connector' = 'kafka'
19+
,'topic' = 'user_log'
20+
,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
21+
,'properties.group.id' = 'user_log'
22+
,'scan.startup.mode' = 'latest-offset'
23+
,'format' = 'json'
24+
);
25+
26+
drop table if exists mysql_behavior_conf;
27+
CREATE
28+
TEMPORARY TABLE mysql_behavior_conf (
29+
user_id STRING,
30+
sex STRING,
31+
age INTEGER,
32+
degree STRING,
33+
address STRING,
34+
work_address STRING,
35+
income_range STRING,
36+
default_shipping_address STRING,
37+
register_date TIMESTAMP(3),
38+
udpate_date TIMESTAMP(3)
39+
) WITH (
40+
'connector' = 'jdbc'
41+
,'url' = 'jdbc:mysql://10.201.0.166:3306/shell1'
42+
,'table-name' = 'user_info'
43+
,'username' = 'root'
44+
,'password' = 'daas2020'
45+
-- ,'scan.partition.column' = 'id'
46+
-- ,'scan.partition.num' = '5'
47+
-- ,'scan.partition.lower-bound' = '5'
48+
-- ,'scan.partition.upper-bound' = '99999'
49+
,'lookup.cache.max-rows' = '100000'
50+
,'lookup.cache.ttl' = '10 minute' -- ttl time 超过这么长时间无数据才行
51+
-- ,'lookup.async' = 'true'
52+
);
53+
54+
55+
---sinkTable
56+
CREATE TABLE user_log_sink
57+
(
58+
user_id STRING,
59+
item_id STRING,
60+
category_id STRING,
61+
behavior STRING,
62+
page STRING,
63+
`position` STRING,
64+
sort STRING,
65+
last_page STRING,
66+
next_page STRING,
67+
ts TIMESTAMP(3),
68+
sex STRING,
69+
age INTEGER,
70+
degree STRING,
71+
address STRING,
72+
work_address STRING,
73+
income_range STRING,
74+
default_shipping_address STRING,
75+
register_date TIMESTAMP(3),
76+
udpate_date TIMESTAMP(3)
77+
-- ,primary key (user_id) not enforced
78+
) WITH (
79+
'connector' = 'kafka'
80+
,'topic' = 'user_log_sink'
81+
,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
82+
,'properties.group.id' = 'user_log'
83+
,'scan.startup.mode' = 'group-offsets'
84+
,'format' = 'json'
85+
);
86+
87+
INSERT INTO user_log_sink
88+
SELECT a.user_id
89+
,a.item_id
90+
,a.category_id
91+
,a.behavior
92+
,a.page
93+
,a.`position`
94+
,a.sort
95+
,a.last_page
96+
,a.next_page
97+
,a.ts
98+
,b.sex
99+
,b.age
100+
,b.degree
101+
,b.address
102+
,b.work_address
103+
,b.income_range
104+
,b.default_shipping_address
105+
,b.register_date
106+
,b.udpate_date
107+
FROM user_log a
108+
left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS b
109+
ON a.user_id = b.user_id
110+
where a.behavior is not null;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
-- Lookup Source: Sync Mode
2+
-- kafka source
3+
CREATE TABLE user_log (
4+
user_id STRING
5+
,item_id STRING
6+
,category_id STRING
7+
,behavior STRING
8+
,ts TIMESTAMP(3)
9+
,process_time as proctime()
10+
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
11+
) WITH (
12+
'connector' = 'kafka'
13+
,'topic' = 'user_log'
14+
,'properties.bootstrap.servers' = 'localhost:9092'
15+
,'properties.group.id' = 'user_log'
16+
,'scan.startup.mode' = 'latest-offset'
17+
,'format' = 'json'
18+
);
19+
20+
CREATE TEMPORARY TABLE redis_table (
21+
`key` STRING
22+
,filed STRING
23+
,`value` STRING
24+
) WITH (
25+
'connector' = 'cust-redis'
26+
,'redis.url' = 'redis://localhost:6379?timeout=3000'
27+
,'lookup.cache.max.size' = '28'
28+
,'lookup.cache.expire.ms' = '3600000' -- ttl time 超过这么长时间无数据才行
29+
-- ,'pass' = '11' -- todo test
30+
);
31+
32+
---sinkTable
33+
CREATE TABLE kakfa_join_redis_sink (
34+
user_id STRING
35+
,item_id STRING
36+
,category_id STRING
37+
,behavior STRING
38+
,behavior_map STRING
39+
,ts TIMESTAMP(3)
40+
,primary key (user_id) not enforced
41+
) WITH (
42+
'connector' = 'print'
43+
);
44+
-- sting/list/set/zset test sql
45+
-- INSERT INTO kakfa_join_redis_sink(user_id, item_id, category_id, behavior, behavior_map, ts)
46+
-- SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.`value`, a.ts
47+
-- FROM user_log a
48+
-- left join redis_table FOR SYSTEM_TIME AS OF a.process_time AS b
49+
-- ON a.behavior = b.`key`
50+
-- where a.behavior is not null;
51+
52+
CREATE TABLE kakfa_join_redis_sink_1 (
53+
user_id STRING
54+
,item_id STRING
55+
,category_id STRING
56+
,behavior STRING
57+
,behavior_key STRING
58+
,behavior_map STRING
59+
,ts TIMESTAMP(3)
60+
,primary key (user_id) not enforced
61+
) WITH (
62+
'connector' = 'print'
63+
)
64+
;
65+
66+
67+
-- hash multiple input
68+
INSERT INTO kakfa_join_redis_sink_1(user_id, item_id, category_id, behavior, behavior_key,behavior_map, ts)
69+
SELECT a.user_id, a.item_id, a.category_id, a.behavior,b.filed, b.`value`, a.ts
70+
FROM user_log a
71+
left join redis_table FOR SYSTEM_TIME AS OF a.process_time AS b
72+
ON a.behavior = b.key
73+
where a.behavior is not null;
74+
75+
-- INSERT INTO kakfa_join_redis_sink_1(user_id, item_id, category_id, behavior, behavior_key,behavior_map, ts)
76+
-- SELECT a.user_id, a.item_id, a.category_id, a.behavior,b.filed, b.`value`, a.ts
77+
-- FROM user_log a
78+
-- left join redis_table FOR SYSTEM_TIME AS OF a.process_time AS b
79+
-- ON a.behavior = b.key and a.item = b.filed
80+
-- where a.behavior is not null;

0 commit comments

Comments
 (0)