Skip to content

Commit 5a51d81

Browse files
committed
完善文档和示例代码
1 parent 2ba1fb7 commit 5a51d81

File tree

17 files changed

+484
-147
lines changed

17 files changed

+484
-147
lines changed

.idea/encodings.xml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

_doc/img/3.jpg

-11.6 KB
Loading

_doc/img/4.jpg

12 KB
Loading

_doc/img/kafkademo.jpg

30.1 KB
Loading

_doc/page/kafkademo.md

Whitespace-only changes.

_doc/readme.pptx

15 KB
Binary file not shown.

kafkademo/pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.wowtools.hppt</groupId>
8+
<artifactId>hppt</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>kafkademo</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>org.wowtools.hppt</groupId>
17+
<artifactId>run</artifactId>
18+
<version>1.0-SNAPSHOT</version>
19+
</dependency>
20+
<dependency>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-clients</artifactId>
23+
<version>3.4.0</version>
24+
</dependency>
25+
</dependencies>
26+
27+
</project>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.wowtools.hppt.kafkademo;
2+
3+
import org.apache.kafka.clients.producer.KafkaProducer;
4+
import org.apache.kafka.clients.producer.Producer;
5+
import org.apache.kafka.clients.producer.ProducerRecord;
6+
import org.wowtools.hppt.run.sc.common.ClientSessionService;
7+
import org.wowtools.hppt.run.sc.pojo.ScConfig;
8+
import org.wowtools.hppt.run.ss.pojo.SsConfig;
9+
10+
import java.util.ArrayList;
11+
import java.util.Properties;
12+
13+
/**
14+
* 客户端,部署在电脑A上
15+
* @author liuyu
16+
* @date 2024/6/15
17+
*/
18+
public class ClientDemo extends ClientSessionService {
19+
//TODO 传输文件等大字节数传播的情况下,需处理kafka字节顺序消费问题
20+
public ClientDemo(ScConfig config) throws Exception {
21+
super(config);
22+
}
23+
24+
private KafkaUtil.BytesFunction sendToServer;
25+
private KafkaUtil.BytesFunction clientConsumer;
26+
@Override
27+
protected void connectToServer(ScConfig config, Cb cb) throws Exception {
28+
//初始化时构造好向kafka生产和消费数据的工具
29+
sendToServer = KafkaUtil.buildProducer(KafkaUtil.ClientSendTopic);
30+
31+
clientConsumer = (bytes) -> {
32+
//消费到客户端的数据,调用receiveServerBytes方法来接收
33+
try {
34+
receiveServerBytes(bytes);
35+
} catch (Exception e) {
36+
throw new RuntimeException(e);
37+
}
38+
};
39+
KafkaUtil.buildConsumer("client", KafkaUtil.ServerSendTopic, clientConsumer);
40+
cb.end();//调用end方法,通知框架连接完成
41+
}
42+
43+
@Override
44+
protected void sendBytesToServer(byte[] bytes) {
45+
sendToServer.f(bytes);
46+
}
47+
48+
public static void main(String[] args) throws Exception{
49+
ScConfig cfg = new ScConfig();
50+
cfg.clientId = "user1";
51+
ScConfig.Forward forward = new ScConfig.Forward();
52+
forward.localPort = 10022;
53+
forward.remoteHost = "wsl";
54+
forward.remotePort = 22;
55+
cfg.forwards = new ArrayList<>();
56+
cfg.forwards.add(forward);
57+
new ClientDemo(cfg).sync();
58+
}
59+
60+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.wowtools.hppt.kafkademo;
2+
3+
/**
4+
* @author liuyu
5+
* @date 2024/6/15
6+
*/
7+
public class KafkaCtx {
8+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.wowtools.hppt.kafkademo;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.apache.kafka.clients.consumer.ConsumerRecords;
5+
import org.apache.kafka.clients.consumer.KafkaConsumer;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.Producer;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
10+
import java.time.Duration;
11+
import java.util.Collections;
12+
import java.util.Properties;
13+
14+
/**
15+
* kafka工具类
16+
*
17+
* @author liuyu
18+
* @date 2024/6/15
19+
*/
20+
public class KafkaUtil {
21+
22+
//客户端发数据的topic
23+
public static final String ClientSendTopic = "client-send";
24+
//服务端发数据的topic
25+
public static final String ServerSendTopic = "server-send";
26+
27+
28+
//基本的kafka连接配置
29+
private static Properties buildProperties() {
30+
Properties props = new Properties();
31+
props.put("bootstrap.servers", "wsl:9092"); // 部署在电脑C上的Kafka服务器地址
32+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
33+
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
34+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
35+
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
36+
return props;
37+
}
38+
39+
@FunctionalInterface
40+
public interface BytesFunction {
41+
void f(byte[] bytes);
42+
}
43+
44+
/**
45+
* 构造一个向指定topic发送bytes数据的工具
46+
*
47+
* @param topic 主题
48+
* @return BytesFunction 调用其f(byte[] bytes)方法发送数据
49+
*/
50+
public static BytesFunction buildProducer(String topic) {
51+
Producer<String, byte[]> producer = new KafkaProducer<>(buildProperties());
52+
return (bytes -> {
53+
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes);
54+
producer.send(record);
55+
});
56+
}
57+
58+
/**
59+
* 消费kafka数据
60+
*
61+
* @param groupId 消费者组
62+
* @param topic 主题
63+
* @param cb 消费到字节时回调
64+
*/
65+
public static void buildConsumer(String groupId, String topic, BytesFunction cb) {
66+
Properties props = buildProperties();
67+
props.put("group.id", groupId+System.currentTimeMillis());
68+
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
69+
// 订阅主题
70+
consumer.subscribe(Collections.singletonList(topic));
71+
// 消费消息
72+
Thread.startVirtualThread(()->{
73+
while (true) {
74+
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(10));
75+
for (ConsumerRecord<String, byte[]> record : records) {
76+
byte[] value = record.value();
77+
cb.f(value);
78+
}
79+
}
80+
});
81+
}
82+
}

0 commit comments

Comments
 (0)