-
[AWS] spring-cloud-aws-messaging을 이용한 FIFO 유형의 AWS SQS 연동하기클라우드/aws 2021. 1. 21. 11:00
이번 시간에는 Spring Boot 프로젝트에서 spring-cloud-aws-messaging를 이용해서 FIFO 유형의 AWS SQS를 연동하는 방법에 대해서 알아보도록 하겠습니다.
SQS란?
SQS(Simple Queue Service)는 마이크로 서비스와 분산 시스템, 그리고 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 ‘완전관리형 메시지 대기열 서비스’입니다.
01. dependency 추가
build.gradle 설정을 통해서 SQS 관련 dependencies를 추가합니다.
- spring-cloud-starter-aws
- spring-cloud-aws-messaging
plugins { id 'java' id 'eclipse' id 'idea' id 'org.springframework.boot' id 'io.spring.dependency-management' } sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-aws-messaging implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '2.2.1.RELEASE' // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-aws implementation group: 'org.springframework.cloud', name: 'spring-cloud-starter-aws', version: '2.2.1.RELEASE' implementation 'org.projectlombok:lombok:1.18.12' annotationProcessor 'org.projectlombok:lombok:1.18.12' } bootJar { launchScript() }
02. SQS 설정 정보 application.yml에 추가
cloud: aws: credentials: access-key: xxxxxx secret-key: xxxxxx region: static: ap-northeast-2 # AWS SQS의 리전정보 stack: auto: false sqs: queue: name: MyQueue.fifo # AWS에서 생성한 Queue 이름 url: https://sqs.ap-northeast-2.amazonaws.com/xxxx/MyQueue.fifo # 생성한 SQS 큐 url logging: pattern: console: '%d{yyyy-MM-dd HH:mm:ss} %-5level --- [%thread] %logger{35} : %msg %n'
03. SQS Configuration 클래스 생성
- application.yml에 있는 access-key, secret-key 값을 가지고 AWSCredentialsProvider 빈 생성
- Message 송신에 사용되는 AmazonSQS 빈 생성 시 해당 credentials 정보와 region 정보 설정
- Message 수신시 사용되는 Listener 관련 설정
@Configuration public class AwsSQSConfigure { @Value("${cloud.aws.credentials.access-key}") private String accessKey; @Value("${cloud.aws.credentials.secret-key}") private String secretKey; @Value("${cloud.aws.region.static}") private String region; @Bean @Primary public AWSCredentialsProvider awsCredentialsProvider() { return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); } @Bean public AmazonSQS amazonSQSClient() { AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard().withCredentials(awsCredentialsProvider()); builder.withRegion(region); return builder.build(); } @Bean public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) { SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory(); factory.setAmazonSqs(amazonSQSAsync); // SQS API를 가지고 통신을 하는 container에 의해 사용되어지는 AmazonSQSAsync를 설정 factory.setMaxNumberOfMessages(10); // 한 번 poll하는 동안 조회되어져야하는 메세지의 최대 개수, 높을 수록 poll request 요청을 줄여준다. factory.setWaitTimeOut(20); // queue에 메세지가 없을 때, queue로 들어오는 새로운 메세지에 대해 poll 요청 시 대기하는 timeout 시간 factory.setTaskExecutor(messageThreadPoolTaskExecutor()); // 메세지를 poll하고 처리하는 handler 함수 설정 return factory; } @Bean public ThreadPoolTaskExecutor messageThreadPoolTaskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setThreadNamePrefix("sqs-task-"); taskExecutor.setCorePoolSize(20); taskExecutor.setMaxPoolSize(20); taskExecutor.afterPropertiesSet(); return taskExecutor; } }
04. 메세지 전달 서비스 생성
@Setter @Getter @Component @ConfigurationProperties(prefix = "cloud.aws.sqs.queue") public class AmazonSQSProperties { private String url; }
import com.amazonaws.services.sqs.model.SendMessageResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.guide.aws.sqs.dto.Message; public interface AmazonSQSSender { SendMessageResult sendMessage(Message message) throws JsonProcessingException; }
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.guide.aws.sqs.dto.Message; import com.guide.aws.sqs.properties.AmazonSQSProperties; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.UUID; @RequiredArgsConstructor @Component public class AmazonSQSSenderImpl implements AmazonSQSSender{ private final ObjectMapper objectMapper; private final AmazonSQS amazonSQS; private final AmazonSQSProperties properties; @Override public SendMessageResult sendMessage(Message msg) throws JsonProcessingException { SendMessageRequest sendMessageRequest = new SendMessageRequest(properties.getUrl(), objectMapper.writeValueAsString(msg)); return amazonSQS.sendMessage(sendMessageRequest); } }
05. 테스트용 Controller 생성
@Slf4j @RequiredArgsConstructor @RestController public class MessageAPI { private final ObjectMapper objectMapper; private final AmazonSQSSender amazonSQSSender; @PostMapping("/send") public String send(@RequestBody Message message) throws JsonProcessingException { amazonSQSSender.sendMessage(message); return "OK"; } }
어플리케이션 실행 시 후 메세지를 전송할 때 아래와 같은 예외가 발생됩니다.
FIFO 유형의 대기열에 메세지를 전송시에 MessageGroupId를 추가해서 전달해야 아래와 같은 예외가 발생하지 않습니다.
com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageGroupId.
MessageGroupId를 추가 후 다시 실행했을 때 다시 아래와 같은 예외가 발생하였습니다.
이와 같은 예외를 처리하기 위해서는 아래 둘 중 하나의 조건을 만족시켜줍니다.
- Queue 생성 시 콘텐츠 기반 중복 제거 옵션을 ON
- 메세지 생성 시 MessageDuplicationId 값을 추가
com.amazonaws.services.sqs.model.AmazonSQSException: The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly
Queue 생성 시 콘텐츠 기반 중복 제거 옵션 ON
위 예외들을 전부 처리한 메세지 전달 서비스
@RequiredArgsConstructor @Component public class AmazonSQSSenderImpl implements AmazonSQSSender{ private final ObjectMapper objectMapper; private final AmazonSQS amazonSQS; private final AmazonSQSProperties properties; @Override public SendMessageResult sendMessage(Message msg) throws JsonProcessingException { SendMessageRequest sendMessageRequest = new SendMessageRequest(properties.getUrl(), objectMapper.writeValueAsString(msg)) .withMessageGroupId("live-commerce") .withMessageDeduplicationId(UUID.randomUUID().toString()); return amazonSQS.sendMessage(sendMessageRequest); } }
06. 메세지 수신 Listener 생성
메세지를 수신하기 위한 Listener를 테스트용 Controller에 추가
@Slf4j @RequiredArgsConstructor @RestController public class MessageAPI { private final ObjectMapper objectMapper; private final AmazonSQSSender amazonSQSSender; @PostMapping("/send") public String send(@RequestBody Message message) throws JsonProcessingException { amazonSQSSender.sendMessage(message); return "OK"; } @SqsListener(value = "${cloud.aws.sqs.queue.name}") private void receiveMessage(@Headers Map<String, String> header, @Payload String message) throws JsonProcessingException { Message readValue = objectMapper.readValue(message, Message.class); System.out.println(readValue.getType()); System.out.println(readValue.getMsg()); } }
'클라우드 > aws' 카테고리의 다른 글
[AWS] RDS에 TimeZone 설정 방법 (0) 2021.02.04 [AWS] ElastiCache Redis 생성 및 접속 테스트 절차 (0) 2021.01.23 [AWS] S3 정적 웹 호스팅 (0) 2020.11.30 [AWS] Amazon ECS 서비스 설정 - ECS 작업 및 클러스터 (14) (0) 2020.10.16 [AWS] Amazon ECS 서비스 설정 - AWS CLI 설치 및 ECR 이미지 업로드 (13) (0) 2020.10.16