Skip to content

Commit

Permalink
Merge pull request #339 from 6QuizOnTheBlock/be/feat/#333-send-question
Browse files Browse the repository at this point in the history
hotfix: ๋ฐฐํฌ ํ™˜๊ฒฝ์—์„œ Consumer ๋กœ ๋ฉ”์„ธ์ง€๊ฐ€ ์•ˆ ์˜ค๋Š” ๋ฌธ์ œ ํ•ด๊ฒฐ
  • Loading branch information
dalcheonroadhead authored May 16, 2024
2 parents 75753fd + 1b8422f commit 24d2841
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class QuizReceive {
@Value("${kafka.group}")
String kafkaGroup;

@KafkaListener(topics = ConstantUtil.QUIZ_GAMER, containerFactory = "gamerKafkaListenerContainerFactory", concurrency = "4")
@KafkaListener(topics = ConstantUtil.QUIZ_GAMER, groupId = "group-id", containerFactory = "gamerListenerContainerFactory", concurrency = "4")
public void receivedGamer(GamerDTO gamer) {
log.info("/gamer/" + gamer.quizGameId());
log.info("๊ฒŒ์ด๋จธ ์ƒ์„ธ={}", gamer.toString());
Expand All @@ -51,7 +51,7 @@ public void receivedGamer(GamerDTO gamer) {
(gamer.quizGameId(), redisUtil.getAllMemberScores(gamer.quizGameId())));
}

@KafkaListener(topics = ConstantUtil.QUIZ_QUESTION, containerFactory = "questionKafkaListenerContainerFactory")
@KafkaListener(topics = ConstantUtil.QUIZ_QUESTION, groupId = "group-id", containerFactory = "questionRequestContainerFactory")
public void receivedQuestion(QuestionRequest request) {
log.info("๋ณด๋‚ด์ค˜์•ผ ํ•  ์งˆ๋ฌธ ์ƒ์„ธ={}", request.toString());

Expand All @@ -60,7 +60,7 @@ public void receivedQuestion(QuestionRequest request) {
request.quizGameId(), request.id()));
}

@KafkaListener(topics = ConstantUtil.QUIZ_ANSWER, containerFactory = "answerKafkaListenerContainerFactory")
@KafkaListener(topics = ConstantUtil.QUIZ_ANSWER, groupId = "group-id", containerFactory = "answerResponseContainerFactory")
public void receivedAnswer(AnswerResponse response) {
log.info("๋ณด๋‚ด์ค˜์•ผํ•  ๋‹ต ์ƒ์„ธ={}", response.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,73 +57,81 @@ consumerConfigurations, new StringDeserializer(), deserializer
);
}

// Kafka Consumer ์„ค์ •
@Bean
public Map<String, Object> consumerConfigurations() {
return ImmutableMap.<String, Object>builder()
.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaUrl)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
.build();
ConcurrentKafkaListenerContainerFactory<String, GamerDTO> gamerListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GamerDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(gamerDTOConsumerFactory());
return factory;
}

// ConsumerFactory ์„ค์ •
@Bean
public ConsumerFactory<String, GamerDTO> gamerConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigurations(),
new StringDeserializer(),
new JsonDeserializer<>(GamerDTO.class));
}
public ConsumerFactory<String, GamerDTO> gamerDTOConsumerFactory() {
JsonDeserializer<GamerDTO> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("*");

// Listener Container ์„ค์ •
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GamerDTO> gamerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GamerDTO> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(gamerConsumerFactory());
return factory;
}
Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl)
.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build();

// Question Consumer ์„ค์ •
@Bean
public ConsumerFactory<String, QuestionRequest> questionConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigurations(),
new StringDeserializer(),
new JsonDeserializer<>(QuestionRequest.class));
consumerConfigurations, new StringDeserializer(), deserializer
);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, QuestionRequest> questionKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, QuestionRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(questionConsumerFactory());
ConcurrentKafkaListenerContainerFactory<String, QuestionRequest> questionRequestContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, QuestionRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(questionRequestConsumerFactory());
return factory;
}

// Answer Consumer ์„ค์ •
@Bean
public ConsumerFactory<String, AnswerResponse> answerConsumerFactory() {
public ConsumerFactory<String, QuestionRequest> questionRequestConsumerFactory() {
JsonDeserializer<QuestionRequest> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("*");

Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl)
.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build();

return new DefaultKafkaConsumerFactory<>(
consumerConfigurations(),
new StringDeserializer(),
new JsonDeserializer<>(AnswerResponse.class));
consumerConfigurations, new StringDeserializer(), deserializer
);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, AnswerResponse> answerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, AnswerResponse> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(answerConsumerFactory());
ConcurrentKafkaListenerContainerFactory<String, AnswerResponse> answerResponseContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, AnswerResponse> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(answerResponseConsumerFactory());
return factory;
}

@Bean
public ConsumerFactory<String, AnswerResponse> answerResponseConsumerFactory() {
JsonDeserializer<AnswerResponse> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("*");

Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl)
.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build();
return new DefaultKafkaConsumerFactory<>(
consumerConfigurations, new StringDeserializer(), deserializer
);
}
}

0 comments on commit 24d2841

Please sign in to comment.