Skip to content

Commit 59229ff

Browse files
kalencayawangqi
and
wangqi
authored
[Feature][scaleph-ui-react] add flink-cdc web (#688)
* feature: update dataservice config web * feature: update sql script * feature: add flink-cdc web --------- Co-authored-by: wangqi <[email protected]>
1 parent a48db8d commit 59229ff

File tree

43 files changed

+1780
-10
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1780
-10
lines changed

pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@
169169
<kubernetes.client.version>6.2.0</kubernetes.client.version>
170170
<spring-cloud-openfeign.version>3.1.7</spring-cloud-openfeign.version>
171171
<zjsonpatch.version>0.4.14</zjsonpatch.version>
172+
<kogito.version>2.44.0.Alpha</kogito.version>
172173
</properties>
173174

174175
<dependencyManagement>
@@ -299,6 +300,11 @@
299300
<artifactId>scaleph-engine-seatunnel</artifactId>
300301
<version>${project.version}</version>
301302
</dependency>
303+
<dependency>
304+
<groupId>${project.groupId}</groupId>
305+
<artifactId>scaleph-engine-flink-cdc</artifactId>
306+
<version>${project.version}</version>
307+
</dependency>
302308
<dependency>
303309
<groupId>${project.groupId}</groupId>
304310
<artifactId>scaleph-engine-flink-kubernetes</artifactId>
@@ -712,6 +718,17 @@
712718
<artifactId>zjsonpatch</artifactId>
713719
<version>${zjsonpatch.version}</version>
714720
</dependency>
721+
722+
<dependency>
723+
<groupId>org.kie.kogito</groupId>
724+
<artifactId>kogito-spring-boot-starter</artifactId>
725+
<version>${kogito.version}</version>
726+
</dependency>
727+
<dependency>
728+
<groupId>org.kie.kogito</groupId>
729+
<artifactId>kogito-serverless-workflow-runtime</artifactId>
730+
<version>${kogito.version}</version>
731+
</dependency>
715732
</dependencies>
716733
</dependencyManagement>
717734

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package cn.sliew.scaleph.api.controller.ws;
20+
21+
import cn.sliew.scaleph.api.annotation.Logging;
22+
import cn.sliew.scaleph.common.exception.ScalephException;
23+
import cn.sliew.scaleph.dag.xflow.dnd.DndDTO;
24+
import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCDagService;
25+
import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCJobService;
26+
import cn.sliew.scaleph.engine.flink.cdc.service.dto.WsFlinkArtifactCDCDTO;
27+
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCAddParam;
28+
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCListParam;
29+
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCSelectListParam;
30+
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCUpdateParam;
31+
import cn.sliew.scaleph.plugin.framework.exception.PluginException;
32+
import cn.sliew.scaleph.system.model.ResponseVO;
33+
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
34+
import io.swagger.v3.oas.annotations.Operation;
35+
import io.swagger.v3.oas.annotations.tags.Tag;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.http.HttpStatus;
38+
import org.springframework.http.ResponseEntity;
39+
import org.springframework.validation.annotation.Validated;
40+
import org.springframework.web.bind.annotation.*;
41+
42+
import javax.validation.Valid;
43+
import java.util.List;
44+
45+
@Tag(name = "Flink CDC")
46+
@RestController
47+
@RequestMapping(path = "/api/flink-cdc")
48+
public class WsFlinkCDCController {
49+
50+
@Autowired
51+
private FlinkCDCDagService flinkCDCDagService;
52+
@Autowired
53+
private FlinkCDCJobService flinkCDCJobService;
54+
55+
@Logging
56+
@GetMapping("/dag/dnd")
57+
@Operation(summary = "查询DAG节点元信息", description = "后端统一返回节点信息")
58+
public ResponseEntity<ResponseVO<List<DndDTO>>> loadNodeMeta() throws PluginException {
59+
List<DndDTO> dnds = flinkCDCDagService.getDnds();
60+
return new ResponseEntity<>(ResponseVO.success(dnds), HttpStatus.OK);
61+
}
62+
63+
@Logging
64+
@GetMapping
65+
@Operation(summary = "查询 fink cdc 列表", description = "分页查询 fink cdc 列表")
66+
public ResponseEntity<Page<WsFlinkArtifactCDCDTO>> listJob(@Valid WsFlinkArtifactCDCListParam param) {
67+
Page<WsFlinkArtifactCDCDTO> page = flinkCDCJobService.listByPage(param);
68+
return new ResponseEntity<>(page, HttpStatus.OK);
69+
}
70+
71+
@Logging
72+
@GetMapping("/all")
73+
@Operation(summary = "查询 fink cdc 列表", description = "查询 fink cdc 列表")
74+
public ResponseEntity<List<WsFlinkArtifactCDCDTO>> listAll(@Valid WsFlinkArtifactCDCSelectListParam param) {
75+
List<WsFlinkArtifactCDCDTO> result = flinkCDCJobService.listAll(param);
76+
return new ResponseEntity<>(result, HttpStatus.OK);
77+
}
78+
79+
@Logging
80+
@PutMapping
81+
@Operation(summary = "新增 fink cdc", description = "新增 fink cdc,不涉及 DAG")
82+
public ResponseEntity<ResponseVO<WsFlinkArtifactCDCDTO>> simpleAddJob(@Validated @RequestBody WsFlinkArtifactCDCAddParam param) {
83+
WsFlinkArtifactCDCDTO wsFlinkArtifactCDCDTO = flinkCDCJobService.insert(param);
84+
return new ResponseEntity<>(ResponseVO.success(wsFlinkArtifactCDCDTO), HttpStatus.CREATED);
85+
}
86+
87+
@Logging
88+
@PostMapping
89+
@Operation(summary = "修改 fink cdc", description = "只修改 fink cdc 属性,不涉及 DAG")
90+
public ResponseEntity<ResponseVO> simpleEditJob(@Validated @RequestBody WsFlinkArtifactCDCUpdateParam param) {
91+
flinkCDCJobService.update(param);
92+
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
93+
}
94+
95+
@Logging
96+
@DeleteMapping("{id}")
97+
@Operation(summary = "删除 fink cdc", description = "删除 fink cdc")
98+
public ResponseEntity<ResponseVO> deleteJob(@PathVariable("id") Long id) throws ScalephException {
99+
flinkCDCJobService.delete(id);
100+
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
101+
}
102+
103+
@Logging
104+
@DeleteMapping("batch")
105+
@Operation(summary = "批量删除 fink cdc", description = "批量删除 fink cdc")
106+
public ResponseEntity<ResponseVO> deleteBatch(@RequestBody List<Long> ids) throws ScalephException {
107+
flinkCDCJobService.deleteBatch(ids);
108+
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
109+
}
110+
111+
@Logging
112+
@DeleteMapping("all")
113+
@Operation(summary = "批量删除 fink cdc", description = "批量删除 fink cdc")
114+
public ResponseEntity<ResponseVO> deleteAll(@RequestParam("flinkArtifactId") Long flinkArtifactId) throws ScalephException {
115+
flinkCDCJobService.deleteAll(flinkArtifactId);
116+
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
117+
}
118+
}

scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import cn.sliew.scaleph.common.dict.dataservice.QueryType;
2828
import cn.sliew.scaleph.common.dict.ds.RedisMode;
2929
import cn.sliew.scaleph.common.dict.flink.*;
30+
import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion;
3031
import cn.sliew.scaleph.common.dict.flink.kubernetes.*;
3132
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
3233
import cn.sliew.scaleph.common.dict.job.*;
@@ -110,6 +111,8 @@ public enum DictType implements DictDefinition {
110111

111112
IMAGE_PULL_POLICY("image_pull_policy", "Image Pull Policy", ImagePullPolicy.class),
112113

114+
FLINK_CDC_VERSION("flink_cdc_version", "Flink CDC Version", FlinkCDCVersion.class),
115+
113116
WORKFLOW_TYPE("workflow_type", "Workflow Type", WorkflowType.class),
114117
WORKFLOW_EXECUTE_TYPE("workflow_execute_type", "Workflow Execute Type", WorkflowExecuteType.class),
115118
WORKFLOW_INSTANCE_STATE("workflow_instance_state", "Workflow Instance State", WorkflowInstanceState.class),

scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/cdc/FlinkCDCVersion.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,9 @@ public String getValue() {
5656
public String getLabel() {
5757
return label;
5858
}
59+
60+
public static FlinkCDCVersion current() {
61+
return values()[values().length - 1];
62+
}
63+
5964
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package cn.sliew.scaleph.dao.entity.master.ws;
20+
21+
import cn.sliew.scaleph.common.dict.common.YesOrNo;
22+
import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
23+
import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion;
24+
import cn.sliew.scaleph.dao.entity.BaseDO;
25+
import com.baomidou.mybatisplus.annotation.TableField;
26+
import com.baomidou.mybatisplus.annotation.TableName;
27+
import io.swagger.v3.oas.annotations.media.Schema;
28+
import lombok.Data;
29+
30+
/**
31+
* flink artifact cdc
32+
*/
33+
@Data
34+
@TableName("ws_flink_artifact_cdc")
35+
@Schema(name = "WsFlinkArtifactCDC", description = "flink artifact cdc")
36+
public class WsFlinkArtifactCDC extends BaseDO {
37+
38+
private static final long serialVersionUID = 1L;
39+
40+
@Schema(description = "作业artifact id")
41+
@TableField("flink_artifact_id")
42+
private Long flinkArtifactId;
43+
44+
@TableField(exist = false)
45+
private WsFlinkArtifact wsFlinkArtifact;
46+
47+
@Schema(description = "flink版本")
48+
@TableField("flink_version")
49+
private FlinkVersion flinkVersion;
50+
51+
@Schema(description = "作业引擎")
52+
@TableField("flink_cdc_version")
53+
private FlinkCDCVersion flinkCDCVersion;
54+
55+
@TableField("dag_id")
56+
private Long dagId;
57+
58+
@Schema(description = "current artifact")
59+
@TableField("`current`")
60+
private YesOrNo current;
61+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package cn.sliew.scaleph.dao.mapper.master.ws;
20+
21+
import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
22+
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactCDC;
23+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
24+
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
25+
import org.apache.ibatis.annotations.Param;
26+
import org.springframework.stereotype.Repository;
27+
28+
import java.util.List;
29+
30+
/**
31+
* flink artifact cdc Mapper 接口
32+
*/
33+
@Repository
34+
public interface WsFlinkArtifactCDCMapper extends BaseMapper<WsFlinkArtifactCDC> {
35+
36+
Page<WsFlinkArtifactCDC> list(Page<WsFlinkArtifactCDC> page,
37+
@Param("projectId") Long projectId,
38+
@Param("name") String name,
39+
@Param("flinkVersion") FlinkVersion flinkVersion);
40+
41+
List<WsFlinkArtifactCDC> listAll(@Param("projectId") Long projectId,
42+
@Param("name") String name);
43+
44+
WsFlinkArtifactCDC selectOne(@Param("id") Long id);
45+
46+
WsFlinkArtifactCDC selectCurrent(@Param("artifactId") Long artifactId);
47+
}

0 commit comments

Comments
 (0)