Skip to content

[이용구] sprint12 #506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: part3-이용구
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'com.sprint.mission'
version = '2.3-M11'
version = '3.0-M12'

java {
toolchain {
Expand All @@ -26,6 +26,9 @@ repositories {

dependencies {

//웹 소켓
implementation 'org.springframework.boot:spring-boot-starter-websocket'

// Caffeine 캐시
implementation 'org.springframework.boot:spring-boot-starter-cache'
implementation 'com.github.ben-manes.caffeine:caffeine'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.sprint.mission.discodeit.async;

import com.sprint.mission.discodeit.dto.binaryContent.BinaryContentDto;
import com.sprint.mission.discodeit.entity.BinaryContentUploadStatus;
import com.sprint.mission.discodeit.entity.NotificationType;
import com.sprint.mission.discodeit.exception.file.FileSaveFailedException;
import com.sprint.mission.discodeit.mapper.BinaryContentMapper;
import com.sprint.mission.discodeit.notification.NotificationEvent;
import com.sprint.mission.discodeit.notification.NotificationEventPublisher;
import com.sprint.mission.discodeit.repository.jpa.BinaryContentRepository;
import com.sprint.mission.discodeit.service.basic.SseService;
import com.sprint.mission.discodeit.storage.BinaryContentStorage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -27,18 +30,28 @@ public class BinaryContentUploadExecutor {
private final BinaryContentRepository binaryContentRepository;
private final NotificationEventPublisher notificationEventPublisher;

private final BinaryContentMapper binaryContentMapper;
private final SseService sseService;

@Async
@Retryable(
value = FileSaveFailedException.class,
maxAttempts = 3,
backoff = @Backoff(delay = 1000)
)
public void uploadAsync(UUID id, byte[] bytes, String requestId) {
public void uploadAsync(UUID id, byte[] bytes, UUID requestId) {
try {
MDC.put("requestId", requestId);
storage.put(id, bytes);
binaryContentRepository.updateUploadStatus(id, BinaryContentUploadStatus.SUCCESS);
log.info("업로드 성공 - id: {}", id);

//파일 업로드 상태 변경 이벤트 전송
binaryContentRepository.findById(id).ifPresent(content -> {
BinaryContentDto dto = binaryContentMapper.toDto(content);
sseService.sendBinaryStatus(requestId, dto);
});

} finally {
MDC.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.sprint.mission.discodeit.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class SchedulingConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.sprint.mission.discodeit.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/sub");
registry.setApplicationDestinationPrefixes("/pub"); // pub 메시지 -> @MessageMapping 라우팅됨
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//웹소켓 연결 endpoint /ws
registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.sprint.mission.discodeit.controller;

import com.sprint.mission.discodeit.dto.user.UserDto;
import com.sprint.mission.discodeit.service.basic.SseService;
import lombok.RequiredArgsConstructor;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequiredArgsConstructor
public class SseController {

private final SseService sseService;

@GetMapping("/api/sse")
public SseEmitter connect(
@AuthenticationPrincipal UserDto user,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {

return sseService.subscribe(user.getId(), lastEventId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.sprint.mission.discodeit.controller;

import com.sprint.mission.discodeit.dto.message.MessageCreateDTO;
import com.sprint.mission.discodeit.dto.message.MessageDto;
import com.sprint.mission.discodeit.service.MessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

import java.util.List;

@Slf4j
@RequiredArgsConstructor
@Controller
public class WebSocketMessageController {

private final MessageService messageService;
private final SimpMessagingTemplate messagingTemplate;

@MessageMapping("/messages")
public void handleTextMessage(MessageCreateDTO request) {
MessageDto saved = messageService.create(request, List.of());

String dest = "/sub/channels." + request.getChannelId() + ".messages";
messagingTemplate.convertAndSend(dest, saved);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.time.Instant;
import java.util.UUID;


@Builder
public record NotificationDto(
UUID id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.sprint.mission.discodeit.notification;

import com.sprint.mission.discodeit.entity.Notification;
import com.sprint.mission.discodeit.mapper.NotificationMapper;
import com.sprint.mission.discodeit.service.basic.BasicNotificationService;
import com.sprint.mission.discodeit.service.basic.SseService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
Expand All @@ -14,12 +17,17 @@
public class NotificationEventListener {

private final BasicNotificationService notificationService;
private final NotificationMapper notificationMapper;
private final SseService sseService;

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handle(NotificationEvent event) {
try {
notificationService.send(event);
Notification notification = notificationService.send(event);

sseService.sendNotification(event.receiverId(), notificationMapper.toDto(notification));

} catch (Exception e) {
log.error("알림 저장 실패: {}", event, e);
throw e; // @Retryable을 사용하면 자동 재시도 가능
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class BasicChannelService implements ChannelService {
private final ChannelMapper channelMapper;
private final JwtSessionRepository jwtSessionRepository;

private final SseService sseService;


@CacheEvict(cacheNames = "channelsByUser", allEntries = true)
@Override
Expand Down Expand Up @@ -69,6 +71,10 @@ public ChannelDto create(ChannelCreatePrivateDTO dto) {
})
.toList();

//SSE 이벤트 전송
dto.getParticipantIds().forEach(userId ->
sseService.sendChannelRefresh(userId, channel.getId()));

Instant lastMessageAt = findLastMessageAt(channel);

log.info("private 채널 생성 완료 id: {}", channel.getId());
Expand Down Expand Up @@ -129,6 +135,13 @@ public ChannelDto update(UUID id, ChannelUpdateDTO dto) {
}
findChannel.setChannel(dto.getNewName(), dto.getNewDescription());

//public 채널 수정이니 모든 사용자에게 채널 목록 갱신 알림 전송
List<UUID> allUserIds = userRepository.findAll().stream()
.map(User::getId)
.toList();
allUserIds.forEach(userId ->
sseService.sendChannelRefresh(userId, findChannel.getId()));

Instant lastMessageAt = findLastMessageAt(findChannel);
log.info("채널 수정 완료 id: {}", findChannel.getId());
return channelMapper.toDto(findChannel, List.of(), lastMessageAt);
Expand All @@ -137,10 +150,23 @@ public ChannelDto update(UUID id, ChannelUpdateDTO dto) {
@CacheEvict(cacheNames = "channelsByUser", allEntries = true)
@Override
public void delete(UUID id) {
if (!channelRepository.existsById(id)) {
throw new ChannelNotFoundException(id);
}
Channel channel = channelRepository.findById(id)
.orElseThrow(() -> new ChannelNotFoundException(id));

channelRepository.deleteById(id);

//채널 삭제시, 해당 채널이 private 면 참가자만, public이면 전체 사용자
List<UUID> targetUserIds = channel.getChannelType() == ChannelType.PRIVATE
? readStatusRepository.findAllByChannel(channel).stream()
.map(rs -> rs.getUser().getId())
.toList()
: userRepository.findAll().stream()
.map(User::getId)
.toList();

targetUserIds.forEach(userId ->
sseService.sendChannelRefresh(userId, channel.getId()));

log.info("채널 삭제 완료 id: {}", id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public MessageDto create(MessageCreateDTO dto,
@Override
public void afterCommit() {
String requestId = MDC.get("requestId");
uploadExecutor.uploadAsync(savedBinaryContent.getId(), attachmentRequest.getBytes(), requestId);
uploadExecutor.uploadAsync(savedBinaryContent.getId(), attachmentRequest.getBytes(), dto.getAuthorId());
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void delete(UUID notificationId, UUID currentUserId) {
)
@CacheEvict(cacheNames = "notificationsByUser", key = "#event.receiverId")
@Transactional
public void send(NotificationEvent event) {
public Notification send(NotificationEvent event) {
Notification notification = Notification.builder()
.receiver(User.withId(event.receiverId())) // User.withId()는 직접 만든 정적 팩토리 메서드 (프록시 생성용)
.title(event.title())
Expand All @@ -68,6 +68,6 @@ public void send(NotificationEvent event) {
.targetId(event.targetId())
.build();

notificationRepository.save(notification);
return notificationRepository.save(notification);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class BasicUserService implements UserService {

private final NotificationEventPublisher notificationEventPublisher;

private final SseService sseService;


@CacheEvict(cacheNames = "allUsers", allEntries = true)
@Override
Expand All @@ -76,11 +78,40 @@ public UserDto create(UserCreateDTO dto,
//cascade persist
User saveUser = userRepository.save(user);


// 유저가 생성되고 아이디를 받기 위해서 생성 이후 비동기 호출
optionalProfileCreateRequest.ifPresent(request -> {
byte[] bytes = request.getBytes();
registerUploadAfterCommit(saveUser.getId(), nullableProfile.getId(), bytes);
});

//사용자 목록 갱신 알림
notifyAllUsersToRefreshUserList();

log.info("사용자 생성 완료 id: {}", saveUser.getId());

return userMapper.toDto(saveUser, isUserOnline(saveUser));
}

private void registerUploadAfterCommit(UUID userId, UUID binaryContentId, byte[] bytes) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
uploadExecutor.uploadAsync(binaryContentId, bytes, userId);
}
}
);
}

private void notifyAllUsersToRefreshUserList() {
List<UUID> allUserIds = userRepository.findAll().stream()
.map(User::getId)
.toList();

sseService.sendUserRefreshToAll(allUserIds);
}

@Override
@Transactional(readOnly = true)
public UserDto find(UUID id) {
Expand Down Expand Up @@ -121,6 +152,15 @@ public UserDto update(UUID id, UserUpdateDTO dto,

log.info("사용자 수정 완료 id: {}", findUser.getId());

//비동기 호출
optionalProfileCreateRequest.ifPresent(request -> {
byte[] bytes = request.getBytes();
registerUploadAfterCommit(findUser.getId(), nullableProfile.getId(), bytes);
});

//사용자 목록 갱신 알림
notifyAllUsersToRefreshUserList();

return userMapper.toDto(findUser, isUserOnline(findUser));
}

Expand All @@ -132,6 +172,10 @@ public void delete(UUID id) {
throw new UserNotFoundException(id);
}
userRepository.deleteById(id);

//사용자 목록 갱신 알림
notifyAllUsersToRefreshUserList();

log.info("사용자 삭제 완료 id: {}", id);
}

Expand Down Expand Up @@ -167,7 +211,7 @@ public BinaryContent saveBinaryFile(
binaryContent.updateUploadStatus(BinaryContentUploadStatus.WAITING);
BinaryContent saved = binaryContentRepository.save(binaryContent);

//비동기 업로드 등록 -> 트랜잭션 커밋 이후 실행
/* //비동기 업로드 등록 -> 트랜잭션 커밋 이후 실행
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
Expand All @@ -176,7 +220,7 @@ public void afterCommit() {
uploadExecutor.uploadAsync(saved.getId(), bytes, requestId);
}
}
);
);*/

return saved;
})
Expand Down
Loading