ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [AWS] spring-cloud-aws-messaging을 이용한 FIFO 유형의 AWS SQS 연동하기
    클라우드/aws 2021. 1. 21. 11:00

    이번 시간에는 Spring Boot 프로젝트에서 spring-cloud-aws-messaging를 이용해서 FIFO 유형의 AWS SQS를 연동하는 방법에 대해서 알아보도록 하겠습니다.

     

    SQS란?

    SQS(Simple Queue Service)는 마이크로 서비스와 분산 시스템, 그리고 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 ‘완전관리형 메시지 대기열 서비스’입니다.

     

    01. dependency 추가

    build.gradle 설정을 통해서 SQS 관련 dependencies를 추가합니다.

    • spring-cloud-starter-aws
    • spring-cloud-aws-messaging
    plugins {
        id 'java'
        id 'eclipse'
        id 'idea'
        id 'org.springframework.boot'
        id 'io.spring.dependency-management'
    }
    
    sourceCompatibility = 1.8
    targetCompatibility = 1.8
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-web'
    
        // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-aws-messaging
        implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '2.2.1.RELEASE'
        // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-aws
        implementation group: 'org.springframework.cloud', name: 'spring-cloud-starter-aws', version: '2.2.1.RELEASE'
    
        implementation 'org.projectlombok:lombok:1.18.12'
        annotationProcessor 'org.projectlombok:lombok:1.18.12'
    }
    
    bootJar {
        launchScript()
    }

     

    02. SQS 설정 정보 application.yml에 추가

    cloud:
      aws:
        credentials:
          access-key: xxxxxx
          secret-key: xxxxxx
        region:
          static: ap-northeast-2 # AWS SQS의 리전정보
        stack:
          auto: false
        sqs:
          queue:
            name: MyQueue.fifo	# AWS에서 생성한 Queue 이름
            url: https://sqs.ap-northeast-2.amazonaws.com/xxxx/MyQueue.fifo # 생성한 SQS 큐 url
    logging:
      pattern:
        console: '%d{yyyy-MM-dd HH:mm:ss} %-5level --- [%thread] %logger{35} : %msg %n'

     

    03. SQS Configuration 클래스 생성

    • application.yml에 있는 access-key, secret-key 값을 가지고 AWSCredentialsProvider 빈 생성
    • Message 송신에 사용되는 AmazonSQS 빈 생성 시 해당 credentials 정보와 region 정보 설정
    • Message 수신시 사용되는 Listener 관련 설정

     

    @Configuration
    public class AwsSQSConfigure {
    
        @Value("${cloud.aws.credentials.access-key}")
        private String accessKey;
    
        @Value("${cloud.aws.credentials.secret-key}")
        private String secretKey;
    
        @Value("${cloud.aws.region.static}")
        private String region;
    
        @Bean
        @Primary
        public AWSCredentialsProvider awsCredentialsProvider() {
            return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
        }
    
        @Bean
        public AmazonSQS amazonSQSClient() {
            AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard().withCredentials(awsCredentialsProvider());
            builder.withRegion(region);
            return builder.build();
        }
    
        @Bean
        public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
            SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
            factory.setAmazonSqs(amazonSQSAsync);   // SQS API를 가지고 통신을 하는 container에 의해 사용되어지는 AmazonSQSAsync를 설정
            factory.setMaxNumberOfMessages(10);     // 한 번 poll하는 동안 조회되어져야하는 메세지의 최대 개수, 높을 수록 poll request 요청을 줄여준다.
            factory.setWaitTimeOut(20);             // queue에 메세지가 없을 때, queue로 들어오는 새로운 메세지에 대해 poll 요청 시 대기하는 timeout 시간
            factory.setTaskExecutor(messageThreadPoolTaskExecutor());   // 메세지를 poll하고 처리하는 handler 함수 설정
            return factory;
        }
    
        @Bean
        public ThreadPoolTaskExecutor messageThreadPoolTaskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setThreadNamePrefix("sqs-task-");
            taskExecutor.setCorePoolSize(20);
            taskExecutor.setMaxPoolSize(20);
            taskExecutor.afterPropertiesSet();
            return taskExecutor;
        }
    }

     

    04. 메세지 전달 서비스 생성

    @Setter
    @Getter
    @Component
    @ConfigurationProperties(prefix = "cloud.aws.sqs.queue")
    public class AmazonSQSProperties {
        private String url;
    }
    import com.amazonaws.services.sqs.model.SendMessageResult;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.guide.aws.sqs.dto.Message;
    
    public interface AmazonSQSSender {
        SendMessageResult sendMessage(Message message) throws JsonProcessingException;
    }
    import com.amazonaws.services.sqs.AmazonSQS;
    import com.amazonaws.services.sqs.model.SendMessageRequest;
    import com.amazonaws.services.sqs.model.SendMessageResult;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.guide.aws.sqs.dto.Message;
    import com.guide.aws.sqs.properties.AmazonSQSProperties;
    import lombok.RequiredArgsConstructor;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    @RequiredArgsConstructor
    @Component
    public class AmazonSQSSenderImpl implements AmazonSQSSender{
        private final ObjectMapper objectMapper;
        private final AmazonSQS amazonSQS;
        private final AmazonSQSProperties properties;
    
        @Override
        public SendMessageResult sendMessage(Message msg) throws JsonProcessingException {
            SendMessageRequest sendMessageRequest = new SendMessageRequest(properties.getUrl(), objectMapper.writeValueAsString(msg));
            return amazonSQS.sendMessage(sendMessageRequest);
        }
    }

    05. 테스트용 Controller 생성

    @Slf4j
    @RequiredArgsConstructor
    @RestController
    public class MessageAPI {
        private final ObjectMapper objectMapper;
        private final AmazonSQSSender amazonSQSSender;
    
        @PostMapping("/send")
        public String send(@RequestBody Message message) throws JsonProcessingException {
            amazonSQSSender.sendMessage(message);
            return "OK";
        }
    }

     

    어플리케이션 실행 시 후 메세지를 전송할 때 아래와 같은 예외가 발생됩니다.

    FIFO 유형의 대기열에 메세지를 전송시에 MessageGroupId를 추가해서 전달해야 아래와 같은 예외가 발생하지 않습니다.

    com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageGroupId.

     

    MessageGroupId를 추가 후 다시 실행했을 때 다시 아래와 같은 예외가 발생하였습니다.

    이와 같은 예외를 처리하기 위해서는 아래 둘 중 하나의 조건을 만족시켜줍니다.

    • Queue 생성 시 콘텐츠 기반 중복 제거 옵션을 ON
    • 메세지 생성 시 MessageDuplicationId 값을 추가
    com.amazonaws.services.sqs.model.AmazonSQSException: The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly

     

    Queue 생성 시 콘텐츠 기반 중복 제거 옵션 ON

     

     

    위 예외들을 전부 처리한 메세지 전달 서비스

    @RequiredArgsConstructor
    @Component
    public class AmazonSQSSenderImpl implements AmazonSQSSender{
        private final ObjectMapper objectMapper;
        private final AmazonSQS amazonSQS;
        private final AmazonSQSProperties properties;
    
        @Override
        public SendMessageResult sendMessage(Message msg) throws JsonProcessingException {
            SendMessageRequest sendMessageRequest = new SendMessageRequest(properties.getUrl(), objectMapper.writeValueAsString(msg))
                    .withMessageGroupId("live-commerce")
                    .withMessageDeduplicationId(UUID.randomUUID().toString());
            return amazonSQS.sendMessage(sendMessageRequest);
        }
    }

     

    06. 메세지 수신 Listener 생성

    메세지를 수신하기 위한 Listener를 테스트용 Controller에 추가

    @Slf4j
    @RequiredArgsConstructor
    @RestController
    public class MessageAPI {
        private final ObjectMapper objectMapper;
        private final AmazonSQSSender amazonSQSSender;
    
        @PostMapping("/send")
        public String send(@RequestBody Message message) throws JsonProcessingException {
            amazonSQSSender.sendMessage(message);
            return "OK";
        }
    
        @SqsListener(value = "${cloud.aws.sqs.queue.name}")
        private void receiveMessage(@Headers Map<String, String> header, @Payload String message) throws JsonProcessingException {
            Message readValue = objectMapper.readValue(message, Message.class);
            System.out.println(readValue.getType());
            System.out.println(readValue.getMsg());
        }
    }

     

     

    댓글

Designed by Tistory.