-
Notifications
You must be signed in to change notification settings - Fork 48
[#46] 비동기적인 Push 알람 서비스 구현 #48
base: develop
Are you sure you want to change the base?
Changes from all commits
8108c9c
f91e91e
6252939
63e9b35
4451c96
dd0fcde
f45050a
631569a
6e14c40
c6280de
1c5b247
3ece9d5
a1d3104
71c41a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package com.flab.makedel.config; | ||
|
||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.scheduling.annotation.EnableAsync; | ||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | ||
|
||
/* | ||
스프링의 @Async를 사용할 때 비동기처리를 새로운 스레드 풀에서 해주기 위한 설정입니다. | ||
이 설정이 없다면 SimpleAsyncTaskExecutor를 사용하는데 이는 새로운 비동기 작업을 | ||
스레드 풀에서 처리하는 것이 아니라 새로운 스레드를 매번 생성하여 작업을 수행시킵니다. | ||
또한 스레드 관리를 직접 할 수 없어 위험할 수 있습니다. | ||
따라서 밑에 설정에서 스레드 풀을 빈으로 설정해서 @Async 로직이 수행될 때 | ||
이 스레드 풀을 이용하도록 설정해줍니다. | ||
*/ | ||
|
||
@Configuration | ||
@EnableAsync | ||
public class SpringAsyncConfig { | ||
|
||
private static final int CORE_POOL_SIZE = 5; | ||
private static final int MAX_POOL_SIZE = 500; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 이거는 이렇게 높혀주신 이유가 있으실까요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 라이더들에게 지역별로 푸쉬 알람을 보내긴 하지만 갑자기 푸쉬 알람이 폭증할 가능성이 있기 때문에 max_pool_size를 늘려서 푸쉬 알람 보내는 기능에 병목이 일어날 확률이 적어지도록 만들었습니다. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 이런 값은 성능테스트하면서 조절하는게 일반적인데요~ 추후에 한번 테스트해보시면서 조절해봐도 좋을 것 같네요 |
||
private static final int QUEUE_CAPACITY = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. queue capacity가 양수이면 LinkedBlockingQueue로 지정되고 0이하라면 SynchronousQueue를 이용합니다. LinkedBlockingQueue는 큐의 크기가 제한되지 않는 큐입니다. 크기를 수동으로 설정을 할 수도 있습니다. 하지만 큐에 작업이 계속 쌓이게 되면 푸쉬 알람에 병목이 생길 수 있습니다. SynchronousQueue는 큐라기보다는 작업이 들어오면 그것을 스레드에 넘겨주는 역할을 합니다. 큐에 쌓지 않고 바로 프로듀서에서 생성한 작업을 컨슈머인 스레드에게 직접 전달합니다. 쉬고 있는 스레드에게 처리할 작업을 직접 넘겨주므로 효율적입니다. 또한 대기중인 스레드가 없는 상태에서 스레드의 개수가 max pool size보다 작다면 새로운 스레드를 생성해 동작시킵니다. 따라서 푸쉬알람이 폭증해도 대비할 수 있습니다. 하지만 정해둔 max pool size보다 커진다면 abort policy를 이용해 푸쉬 알람 보내는 것을 거부하고 exception을 던집니다. |
||
private static final int KEEP_ALIVE_SECONDS = 60; | ||
private static final String NAME_PREFIX = "springAsyncTask-"; | ||
|
||
@Bean(name = "springAsyncTask") | ||
public Executor threadPoolTaskExecutor() { | ||
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 자바에서 기본으로 제공하는 factory 메소드가 있습니다. 혹시 커스터마이징을 의도하셨다면 이유가 있으실까요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. newFixedThreadPool은 스레드에 할당된 시간 제한이 무제한으로 설정되어 있으므로 또한 newCachedThreadPool은 스레드 풀의 최대 크기가 Integer.MAX_VALUE로 지정되어있어 적용하기 알맞지 않다고 생각했습니다. 왜냐하면 푸쉬알람이 아무리 많이와도 스레드의 개수를 무작정 많이 늘려도 모두 처리할 수 없기 때문에 적당한 크기의 최대 스레드 개수를 지정하기 위해 커스터마이징을 하게 되었습니다. 하지만 synchronousQueue를 이용하여 스레드를 큐에 넣지 않고 바로 스레드로 작업을 넘겨주기 때문에 푸쉬알람이 수십개가 한번에 온다면 max_pool_size를 늘려놓고 이 방식을 쓰는 것도 대안이 될 수 있습니다. 푸쉬 알람 보내는 것을 비동기 작업으로 처리할 것이기 때문에 newCachedThreadPool이 알맞긴 하지만 시간제한을 30초로 두고 직접 코어 사이즈나 큐개수등을 제한하기 위해 커스터마이징을 하게되었습니다. |
||
taskExecutor.setCorePoolSize(CORE_POOL_SIZE); | ||
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. core pool size와 max pool size를 다르게 해주신 이유는 어떤 상황을 가정해서 다르게 주신 것일까요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. core pool size는 스레드 풀 안에 유지되는 스레드 개수이고 만약 core pool size를 높게 둔다면 스레드를 여러개만들어 두니 바로 바로 처리할 수 있지만 스레드를 많이 만들어 두는 것 자체가 메모리 낭비일 수 있으므로 작게 두었습니다. 이 둘을 다르게 설정해준 이유는 한번에 갑자기 많은 푸쉬알람을 보내야 할 때 core pool size의 스레드 개수가 알람을 보내고 response가 올떄 까지 blocking이 되면 작업 큐에 있는 작업들이 처리되지 못하여 core pool size의 스레드를 할당받지 못하고 기다리게 될 것이므로 max pool size만큼 스레드를 늘려 푸쉬 알람을 보내기 위해서 입니다. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 스레드를 생성하고 소멸시키면서 생기는 오버헤드보다는 메모리를 적게 사용하는 것을 선택하신 것 같네요~ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 푸쉬메세지를 보낼 때 Runtime 클래스를 이용해 메모리 사용량을 계산해보았습니다. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 매번 다르게 나와서 정확한 수치 측정은 불가능하나 Maxpool을 500으로 잡았을때와 5로 잡았을때 메모리 차이는 50~100MB정도의 차이는 꾸준히 있었습니다. |
||
taskExecutor.setQueueCapacity(QUEUE_CAPACITY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. core_pool_size만큼 스레드 풀은 스레드를 만들어놓고 스레드를 처리하다 더 많은 작업량이 들어오면 작업 큐에 넣어놓고 작업 큐는 대기하고 있고 core_pool_size의 스레드들이 작업은 완료하면 작업 큐에서 한개씩 스레드를 할당해서 스레드를 재사용합니다. 만약 푸쉬알람이 폭증해서 동시에 몇십개를 보내야 하는 상황에서 QUEUE_CAPACITY가 20이고 core_pool_size로 정해놓은 스레드들이 3이라면 3개의 스레드가 푸쉬알람을 보내고 response를 받고 이 일 처리를 끝내고 이 3개의 스레드로 20개의 작업큐의 작업을 해야 하므로 작업큐에서도 blocking이 있을것이고 core_pool_size만큼의 스레드들도 푸쉬알람을 보내고 response까지 받을때까진 blocking이 있을것이므로 작업큐에서 대기하는 작업들은 굉장히 늦게 처리될 것입니다. 따라서 queue_capactiy를 아예 낮게 두어 core_pool_size의 스레드가 꽉차고 queue_capacity도 꽉찬다면 작업 큐에서 계속 작업들이 기다리는 것을 방지하고자 아예 max_pool_size를 높게 두어 한번에 몇십개씩 푸쉬알람을 보내야 할 때 스레드의 개수를 늘려 푸쉬알람 병목을 대비하였습니다. 기존 스레드풀의 core_pool_size만큼의 스레드가 모두 일을하고있고 작업큐에 작업이 쌓였다면 하지만 queue_capacity가 낮다면 작업큐까지 다 꽉차버리면 max_pool_size까지 스레드를 필요한만큼 늘려버리기 때문에 바로바로 일처리를 할 수 있고 병목이 생길 확률이 줄어듭니다. 시간은 적게 걸리지만 새로운 스레드들을 생성하기 때문에 새로운 스레드를 만드는데 메모리를 더 쓰게 될 것입니다. |
||
taskExecutor.setThreadNamePrefix(NAME_PREFIX); | ||
taskExecutor.setWaitForTasksToCompleteOnShutdown(false); | ||
taskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS); | ||
taskExecutor.setAllowCoreThreadTimeOut(true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 이 메소드를 통해 풀 내부의 모든 스레드가 시간 제한에 걸리도록 하여 특정 크기의 스레드와 크기가 제한된 작업 큐를 사용하는 스레드 풀에서 처리할 작업이 없을 때 스레드가 점차 사라지도록 하였습니다. |
||
taskExecutor.setRejectedExecutionHandler(new AbortPolicy()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. abort policy는 최대 스레드 크기까지 쓰고 작업큐까지 꽉차면 RejectedExecutionException 예외를 던집니다. 이 예외를 잡아서 직접 대응해줘야합니다. 만약 푸쉬 알람을 보내다 실패한다면 이 예외를 푸쉬 알람을 보내려고 한 사용자에게 throw하여 알려야 한다고 생각하여 abortpolicy로 설정하였습니다. |
||
return taskExecutor; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package com.flab.makedel.dto; | ||
|
||
import lombok.Builder; | ||
import lombok.Getter; | ||
|
||
@Getter | ||
@Builder | ||
public class PushMessageDTO { | ||
|
||
private final String title; | ||
|
||
private final String content; | ||
|
||
private final OrderReceiptDTO orderReceipt; | ||
|
||
private final String createdAt; | ||
|
||
public static final String RIDER_MESSAGE_TITLE = "배차 요청"; | ||
public static final String RIDER_MESSAGE_CONTENT = "근처 가게에서 주문이 승인된 후 배차 요청이 도착했습니다. 승인하시겠습니까?"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 만약 이 어플리케이션을 베트남에서도 서비스하게 된다면 이 상수가 큰 걸림돌이 되지 않을까요? 이럴 땐 어떻게 해야할까요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resources 디렉토리 안에 베트남의 언어로 출력할 메세지를 작성하고 resource bundle로 묶습니다. 그 다음 ResourceBundleMessageSoure빈이 리소스 번들을 메세지 소스로 읽어와서 이를 Local.VIETNAM 이런식으로 출력해줄 수 있습니다. |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package com.flab.makedel.service; | ||
|
||
import com.flab.makedel.dao.DeliveryDAO; | ||
import com.flab.makedel.dto.PushMessageDTO; | ||
import com.google.auth.oauth2.GoogleCredentials; | ||
import com.google.firebase.FirebaseApp; | ||
import com.google.firebase.FirebaseOptions; | ||
import com.google.firebase.messaging.BatchResponse; | ||
import com.google.firebase.messaging.FirebaseMessaging; | ||
import com.google.firebase.messaging.FirebaseMessagingException; | ||
import com.google.firebase.messaging.Message; | ||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.PostConstruct; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.log4j.Log4j2; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.core.io.ClassPathResource; | ||
import org.springframework.data.redis.core.RedisTemplate; | ||
import org.springframework.data.redis.core.SessionCallback; | ||
import org.springframework.scheduling.annotation.Async; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 이 커밋은 별 의미가 없는 것 같은데 추가하신 이유가 있으실까요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 뭔가 오류가 있었던것 같습니다. |
||
import org.springframework.stereotype.Service; | ||
|
||
@Service | ||
@Log4j2 | ||
public class PushService { | ||
|
||
private final String firebaseConfigPath; | ||
private final DeliveryDAO deliveryDAO; | ||
|
||
public PushService(@Value("${firebase.config.path}") String firebaseConfigPath, | ||
DeliveryDAO deliveryDAO) { | ||
this.firebaseConfigPath = firebaseConfigPath; | ||
this.deliveryDAO = deliveryDAO; | ||
} | ||
|
||
@PostConstruct | ||
public void init() throws IOException { | ||
FirebaseOptions options = new FirebaseOptions.Builder() | ||
.setCredentials(GoogleCredentials | ||
.fromStream(new ClassPathResource(firebaseConfigPath).getInputStream())) | ||
.build(); | ||
if (FirebaseApp.getApps().isEmpty()) { | ||
FirebaseApp.initializeApp(options); | ||
} | ||
} | ||
|
||
@Async("springAsyncTask") | ||
public void sendMessages(List<Message> messages) { | ||
FirebaseMessaging.getInstance().sendAllAsync(messages); | ||
} | ||
|
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍