Skip to content

Commit

Permalink
Merge pull request #338 from 6QuizOnTheBlock/be/feat/#333-send-question
Browse files Browse the repository at this point in the history
hotfix: Consumer Type을 Producer와 맞추기
  • Loading branch information
dalcheonroadhead authored May 16, 2024
2 parents c764a9d + d1eb67a commit 75753fd
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class QuizReceive {
@Value("${kafka.group}")
String kafkaGroup;

@KafkaListener(topics = ConstantUtil.QUIZ_GAMER, containerFactory = "kafkaListenerContainerFactory", concurrency = "4")
@KafkaListener(topics = ConstantUtil.QUIZ_GAMER, containerFactory = "gamerKafkaListenerContainerFactory", concurrency = "4")
public void receivedGamer(GamerDTO gamer) {
log.info("/gamer/" + gamer.quizGameId());
log.info("게이머 상세={}", gamer.toString());
Expand All @@ -51,7 +51,7 @@ public void receivedGamer(GamerDTO gamer) {
(gamer.quizGameId(), redisUtil.getAllMemberScores(gamer.quizGameId())));
}

@KafkaListener(topics = ConstantUtil.QUIZ_QUESTION, containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = ConstantUtil.QUIZ_QUESTION, containerFactory = "questionKafkaListenerContainerFactory")
public void receivedQuestion(QuestionRequest request) {
log.info("보내줘야 할 질문 상세={}", request.toString());

Expand All @@ -60,7 +60,7 @@ public void receivedQuestion(QuestionRequest request) {
request.quizGameId(), request.id()));
}

@KafkaListener(topics = ConstantUtil.QUIZ_ANSWER, containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = ConstantUtil.QUIZ_ANSWER, containerFactory = "answerKafkaListenerContainerFactory")
public void receivedAnswer(AnswerResponse response) {
log.info("보내줘야할 답 상세={}", response.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.google.common.collect.ImmutableMap;
import com.quiz.ourclass.domain.chat.dto.Message;
import com.quiz.ourclass.domain.quiz.dto.GamerDTO;
import com.quiz.ourclass.domain.quiz.dto.request.QuestionRequest;
import com.quiz.ourclass.domain.quiz.dto.response.AnswerResponse;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -53,4 +56,74 @@ public ConsumerFactory<String, Message> consumerFactory() {
consumerConfigurations, new StringDeserializer(), deserializer
);
}

// Kafka Consumer 설정
@Bean
public Map<String, Object> consumerConfigurations() {
return ImmutableMap.<String, Object>builder()
.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaUrl)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup)
.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
.build();
}

// ConsumerFactory 설정
@Bean
public ConsumerFactory<String, GamerDTO> gamerConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigurations(),
new StringDeserializer(),
new JsonDeserializer<>(GamerDTO.class));
}

// Listener Container 설정
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GamerDTO> gamerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GamerDTO> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(gamerConsumerFactory());
return factory;
}

// Question Consumer 설정
@Bean
public ConsumerFactory<String, QuestionRequest> questionConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigurations(),
new StringDeserializer(),
new JsonDeserializer<>(QuestionRequest.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, QuestionRequest> questionKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, QuestionRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(questionConsumerFactory());
return factory;
}

// Answer Consumer 설정
@Bean
public ConsumerFactory<String, AnswerResponse> answerConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigurations(),
new StringDeserializer(),
new JsonDeserializer<>(AnswerResponse.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, AnswerResponse> answerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, AnswerResponse> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(answerConsumerFactory());
return factory;
}


}

0 comments on commit 75753fd

Please sign in to comment.