Skip to content

Commit

Permalink
新增K8S集群配置
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Jan 23, 2022
1 parent edb23f4 commit fbd80e5
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,32 @@ public TestResult testGateway(ClusterConfiguration clusterConfiguration) {
clusterConfiguration.parseConfig();
Map<String, Object> config = clusterConfiguration.getConfig();
GatewayConfig gatewayConfig = new GatewayConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
if(config.containsKey("hadoopConfigPath")) {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
}else {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
""));
}
if(config.containsKey("flinkConfig")){
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig")));
}
if(Asserts.isEqualsIgnoreCase(clusterConfiguration.getType(),"Yarn")){
gatewayConfig.setType(GatewayType.YARN_PER_JOB);
}else if(Asserts.isEqualsIgnoreCase(clusterConfiguration.getType(),"Kubernetes")){
gatewayConfig.setType(GatewayType.KUBERNETES_APPLICATION);
Map kubernetesConfig = (Map) config.get("kubernetesConfig");
if(kubernetesConfig.containsKey("kubernetes.namespace")) {
gatewayConfig.getFlinkConfig().getConfiguration().put("kubernetes.namespace", kubernetesConfig.get("kubernetes.namespace").toString());
}
if(kubernetesConfig.containsKey("kubernetes.cluster-id")) {
gatewayConfig.getFlinkConfig().getConfiguration().put("kubernetes.cluster-id", kubernetesConfig.get("kubernetes.cluster-id").toString());
}
if(kubernetesConfig.containsKey("kubernetes.container.image")) {
gatewayConfig.getFlinkConfig().getConfiguration().put("kubernetes.container.image", kubernetesConfig.get("kubernetes.container.image").toString());
}
}
return JobManager.testGateway(gatewayConfig);
}
Expand Down
24 changes: 21 additions & 3 deletions dlink-core/src/main/java/com/dlink/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,15 @@ public void setSessionConfig(SessionConfig sessionConfig){

public void buildGatewayConfig(Map<String,Object> config){
gatewayConfig = new GatewayConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
if(config.containsKey("hadoopConfigPath")) {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
}else {
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
""));
}
AppConfig appConfig = new AppConfig();
if(config.containsKey("userJarPath") && Asserts.isNotNullString((String) config.get("userJarPath"))){
appConfig.setUserJarPath(config.get("userJarPath").toString());
Expand All @@ -176,6 +182,18 @@ public void buildGatewayConfig(Map<String,Object> config){
if(config.containsKey("flinkConfig") && Asserts.isNotNullMap((Map<String, String>) config.get("flinkConfig"))){
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig")));
}
if(config.containsKey("kubernetesConfig")){
Map kubernetesConfig = (Map) config.get("kubernetesConfig");
if(kubernetesConfig.containsKey("kubernetes.namespace")) {
gatewayConfig.getFlinkConfig().getConfiguration().put("kubernetes.namespace", kubernetesConfig.get("kubernetes.namespace").toString());
}
if(kubernetesConfig.containsKey("kubernetes.cluster-id")) {
gatewayConfig.getFlinkConfig().getConfiguration().put("kubernetes.cluster-id", kubernetesConfig.get("kubernetes.cluster-id").toString());
}
if(kubernetesConfig.containsKey("kubernetes.container.image")) {
gatewayConfig.getFlinkConfig().getConfiguration().put("kubernetes.container.image", kubernetesConfig.get("kubernetes.container.image").toString());
}
}
}

public void addGatewayConfig(List<Map<String, String>> configList){
Expand Down
1 change: 1 addition & 0 deletions dlink-web/src/components/Studio/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const Studio = (props: any) => {
listSession(dispatch);
showJars(dispatch);
showEnv(dispatch);
onResize();
}, []);

const onClick = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {Form, Button, Input, Modal, Select,Divider,Space,Switch} from 'antd';
import { MinusCircleOutlined, PlusOutlined } from '@ant-design/icons';
import type {ClusterConfigurationTableListItem} from "@/pages/ClusterConfiguration/data";
import {getConfig, getConfigFormValues} from "@/pages/ClusterConfiguration/function";
import {FLINK_CONFIG_LIST, HADOOP_CONFIG_LIST} from "@/pages/ClusterConfiguration/conf";
import {FLINK_CONFIG_LIST, HADOOP_CONFIG_LIST, KUBERNETES_CONFIG_LIST} from "@/pages/ClusterConfiguration/conf";
import type {Config} from "@/pages/ClusterConfiguration/conf";
import {testClusterConfigurationConnect} from "@/pages/ClusterConfiguration/service";

Expand Down Expand Up @@ -39,6 +39,10 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
modalVisible,
} = props;

const onValuesChange = (change: any, all: any) => {
setFormVals({...formVals,...change});
};

const buildConfig = (config: Config[]) =>{
const itemList: JSX.Element[] = [];
config.forEach(configItem => {
Expand Down Expand Up @@ -79,6 +83,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
<Option value="Kubernetes">Flink On Kubernetes</Option>
</Select>
</Form.Item>
{formValsPara.type=='Yarn'?<>
<Divider>Hadoop 配置</Divider>
<Form.Item
name="hadoopConfigPath"
Expand Down Expand Up @@ -123,7 +128,45 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
</>
)}
</Form.List>
</Form.Item>
</Form.Item></>:undefined}
{formValsPara.type=='Kubernetes'?<>
<Divider>Kubernetes 配置</Divider>
{buildConfig(KUBERNETES_CONFIG_LIST)}
<Form.Item
label="其他配置"
>
<Form.List name="kubernetesConfigList">
{(fields, { add, remove }) => (
<>
{fields.map(({ key, name, fieldKey, ...restField }) => (
<Space key={key} style={{ display: 'flex' }} align="baseline">
<Form.Item
{...restField}
name={[name, 'name']}
fieldKey={[fieldKey, 'name']}
>
<Input placeholder="name" />
</Form.Item>
<Form.Item
{...restField}
name={[name, 'value']}
fieldKey={[fieldKey, 'value']}
>
<Input placeholder="value" />
</Form.Item>
<MinusCircleOutlined onClick={() => remove(name)} />
</Space>
))}
<Form.Item>
<Button type="dashed" onClick={() => add()} block icon={<PlusOutlined />}>
添加一个自定义项
</Button>
</Form.Item>
</>
)}
</Form.List>
</Form.Item>
</>:undefined}
<Divider>Flink 配置</Divider>
<Form.Item
name="flinkLibPath"
Expand Down Expand Up @@ -250,6 +293,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
{...formLayout}
form={form}
initialValues={getConfigFormValues(formVals)}
onValuesChange={onValuesChange}
>
{renderContent(formVals)}
</Form>
Expand Down
21 changes: 21 additions & 0 deletions dlink-web/src/pages/ClusterConfiguration/conf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ export const HADOOP_CONFIG_LIST: Config[] = [{
lable: 'ha.zookeeper.quorum',
placeholder: '192.168.123.1:2181,192.168.123.2:2181,192.168.123.3:2181',
}];
export const KUBERNETES_CONFIG_LIST: Config[] = [{
name: 'kubernetes.namespace',
lable: 'kubernetes.namespace',
placeholder: 'dlink',
},{
name: 'kubernetes.cluster-id',
lable: 'kubernetes.cluster-id',
placeholder: 'dlink',
},{
name: 'kubernetes.container.image',
lable: 'kubernetes.container.image',
placeholder: 'dlink',
}];
export const FLINK_CONFIG_LIST: Config[] = [{
name: 'jobmanager.memory.process.size',
lable: 'jobmanager.memory.process.size',
Expand Down Expand Up @@ -44,6 +57,14 @@ export function HADOOP_CONFIG_NAME_LIST () {
return list;
}

export function KUBERNETES_CONFIG_NAME_LIST () {
const list: string[] = [];
KUBERNETES_CONFIG_LIST.forEach(item => {
list.push(item.name);
});
return list;
}

export function FLINK_CONFIG_NAME_LIST() {
const list: string[] = [];
FLINK_CONFIG_LIST.forEach(item => {
Expand Down
46 changes: 34 additions & 12 deletions dlink-web/src/pages/ClusterConfiguration/function.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
import {FLINK_CONFIG_NAME_LIST, HADOOP_CONFIG_NAME_LIST} from "@/pages/ClusterConfiguration/conf";
import {
FLINK_CONFIG_NAME_LIST,
HADOOP_CONFIG_NAME_LIST,
KUBERNETES_CONFIG_NAME_LIST
} from "@/pages/ClusterConfiguration/conf";

export function getConfig(values:any) {
let hadoopConfig = addValueToMap(values,HADOOP_CONFIG_NAME_LIST());
addListToMap(values.hadoopConfigList,hadoopConfig);
let flinkConfig = addValueToMap(values,FLINK_CONFIG_NAME_LIST());
addListToMap(values.flinkConfigList,flinkConfig);
return {
hadoopConfigPath:values.hadoopConfigPath,
flinkLibPath:values.flinkLibPath,
flinkConfigPath:values.flinkConfigPath,
hadoopConfig:hadoopConfig,
flinkConfig:flinkConfig,
};
if(values.type=='Yarn') {
let hadoopConfig = addValueToMap(values, HADOOP_CONFIG_NAME_LIST());
addListToMap(values.hadoopConfigList, hadoopConfig);
return {
hadoopConfigPath:values.hadoopConfigPath,
flinkLibPath:values.flinkLibPath,
flinkConfigPath:values.flinkConfigPath,
hadoopConfig,
flinkConfig,
};
}else if(values.type=='Kubernetes') {
let kubernetesConfig = addValueToMap(values, KUBERNETES_CONFIG_NAME_LIST());
addListToMap(values.kubernetesConfigList, kubernetesConfig);
return {
flinkLibPath:values.flinkLibPath,
flinkConfigPath:values.flinkConfigPath,
kubernetesConfig,
flinkConfig,
};
}
}

type ConfigItem = {
Expand All @@ -27,6 +42,9 @@ function addListToMap(list:[ConfigItem],config:{}){

function addValueToMap(values:{},keys: string []){
let config = {};
if(!values){
return config;
}
for(let i in keys){
config[keys[i]]=values[keys[i]];
}
Expand All @@ -53,16 +71,20 @@ export function getConfigFormValues(values:any) {
'flinkConfigPath',
]);
let hadoopConfig = addValueToMap(config.hadoopConfig,HADOOP_CONFIG_NAME_LIST());
let kubernetesConfig = addValueToMap(config.kubernetesConfig,KUBERNETES_CONFIG_NAME_LIST());
let flinkConfig = addValueToMap(config.flinkConfig,FLINK_CONFIG_NAME_LIST());
let hadoopConfigList = addMapToList(config.hadoopConfig,HADOOP_CONFIG_NAME_LIST());
let kubernetesConfigList = addMapToList(config.kubernetesConfig,KUBERNETES_CONFIG_NAME_LIST());
let flinkConfigList = addMapToList(config.flinkConfig,FLINK_CONFIG_NAME_LIST());
return {
...formValues,
...configValues,
...hadoopConfig,
hadoopConfigList:hadoopConfigList,
...kubernetesConfig,
hadoopConfigList,
kubernetesConfigList,
...flinkConfig,
flinkConfigList:flinkConfigList
flinkConfigList
}
}

Expand Down
5 changes: 4 additions & 1 deletion dlink-web/src/pages/Welcome.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ export default (): React.ReactNode => {
</ul>
</Paragraph>
</Timeline.Item>
<Timeline.Item><Text code>0.5.1</Text> <Text type="secondary">2022-01-23</Text>
<Timeline.Item><Text code>0.5.1</Text> <Text type="secondary">2022-01-??</Text>
<p> </p>
<Paragraph>
<ul>
Expand Down Expand Up @@ -619,6 +619,9 @@ export default (): React.ReactNode => {
<li>
<Link>优化 作业配置查看及全屏开发按钮</Link>
</li>
<li>
<Link>新增 K8S集群配置</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
Expand Down

0 comments on commit fbd80e5

Please sign in to comment.