programing

SpringBoot @SqsListener - 작동하지 않음 - 예외 - 작업 거부예외.

cafebook 2023. 10. 25. 23:44
반응형

SpringBoot @SqsListener - 작동하지 않음 - 예외 - 작업 거부예외.

이미 큐에 5000개의 메시지가 있는 AWS SQS가 있습니다(Sample Message는 'Hello @ 1'과 같습니다). SpringBoot Application을 만들었고 Component Class 중 하나에서 SQS에서 메시지를 읽을 수 있는 메서드를 만듭니다.

package com.example.aws.sqs.service;

import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class MessageReceiverService {   

@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
    log.info("Reading Message... {}", message);
}

}

나의 메인 스프링부트 클래스

@SpringBootApplication 
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
    SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}

응용프로그램이 실행될 때 발생하는 예외:

s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted

사용자 지정 Executor 서비스를 구성하지 않습니다.사전 구성된 Spring Beans.springBootVersion = '2.0.3을 사용합니다.RELEASE' 스프링 클라우드 버전 = '핀칠리'.릴리스'

최대 메시지 수를 설정하면 문제가 해결되는 것 같습니다.

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    return factory;
}

나는 그것이 봄의 버그나 감독이라고 생각합니다.이 문제는 다음의 기본값에서 비롯됩니다.

public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {

    private static final int DEFAULT_WORKER_THREADS = 2;

그리고.

abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
    private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;

maxNumberOfMessages가 설정되어 있지 않으면 10을 SQS에서 꺼낼 메시지 수로 사용하고 2를 작업 실행기의 작업자 수로 사용합니다.즉, 메시지를 한 번에 3개 이상 끌면 해당 예외가 적용됩니다.maxNumberOfMessages를 값(임의 값)으로 수동으로 설정하면 예상되는 대로 값을 동기화하는 두 위치가 모두 사용됩니다.

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
    {
        SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
        container.setMaxNumberOfMessages(5);
        container.setMessageHandler(messageHandler);
        return container;
    }

수신기 스레드 구성에 문제가 있습니다.다음 참조

...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...

기본 스레드 풀 크기가 원하는 크기보다 작습니다.

스프링 애플리케이션에 다음 구성 추가

@Configuration
public class TasksConfiguration implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(5); // TODO: Load this from configuration
        taskScheduler.initialize();
        taskRegistrar.setTaskScheduler(taskScheduler);
    }
}

이제 이러한 작업을 처리할 수 있습니다.

P.S. 이전에 어떤 업무가 거부되었든 일정 기간이 지나면 나중에 처리될 것입니다.

편집: 줄 서 있는 숫자에 사람들이 겁먹고 있는 것 같습니다..setPoolSize(5000). 요구사항에 맞는 번호를 선택할 수 있는 구성 가능한 번호입니다.답변을 위해 더 작은 숫자로 줄이고 있습니다.

이전 답변에 설명을 추가하여 문제가 발생하는 이유 및 MaxNumberOfMessages 메시지의 솔루션 설정이 작동하는 이유를 자세히 설명할 수 없습니다.아래 내용이 모든 것을 명확히 하는 데 도움이 되기를 바랍니다.

SimpleMessageListenerContainerThreadPoolTaskExecutor는 코어 풀 크기가 2 스레드, 최대 풀 크기가 3 스레드, 대기열 용량이 0이 되도록 구성됩니다.그러나 Amazon SQS에 대한 여론조사에서 반환할 메시지의 기본 최대 수는 10으로 설정됩니다.즉, 한 번의 여론조사에서 10개의 메시지를 사용할 수 있어야 하며 이를 처리하기에는 스레드가 충분하지 않습니다.그래서.RejectedExecutionException던집니다.

구성하기setMaxNumberOfMessages10시까지SimpleMessageListenerContainerFactory는 최대 스레드 풀 크기를 11로 설정합니다. 이 경우 충분한 스레드를 사용할 수 있어야 합니다.대기열 용량을 설정하지 않습니다.

대기열 용량을 설정하기 위해 별도의 TaskExecutor를 초기화하고 설정할 수 있습니다.SimpleMessageListenerContainerFactory콩은 다음과 같습니다.

@Bean(name = "sqsAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor(@Value("${external.aws.sqs.core-thread-count}") int coreThreadCount,
                                           @Value("${external.aws.sqs.max-thread-count}") int maxThreadCount,
                                           @Value("${external.aws.sqs.queue-capacity}") int queueCapacity) {
    ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
    asyncTaskExecutor.setCorePoolSize(coreThreadCount);
    asyncTaskExecutor.setMaxPoolSize(maxThreadCount);
    asyncTaskExecutor.setQueueCapacity(queueCapacity);
    asyncTaskExecutor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
    asyncTaskExecutor.initialize();
    return asyncTaskExecutor;
}

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS, @Qualifier("sqsAsyncTaskExecutor") AsyncTaskExecutor asyncTaskExecutor) {
    SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    simpleMessageListenerContainerFactory.setTaskExecutor(asyncTaskExecutor);
    return simpleMessageListenerContainerFactory;
}

내가 사용하는 가치는 핵심이었습니다.ThreadCount = 5, maxThreadCount = 20, queueCapacity = 10입니다.

이미 말했듯이 SimpleMessageListenerContainerFactory에서 setMaxNumberOfMessages를 10으로 설정하면 단일 요청에서 가져온 일괄 처리된 메시지를 모두 처리할 수 있을 것 같습니다.그러나 Task Executor에 대한 보다 정확한 제어가 필요하다고 생각되면 이 구성도 작동합니다.

MessageReceiver 서비스에 @EnableSqs 주석 추가

안녕하세요 저는 스프링 리스너를 이용해서 이 문제를 해결했습니다.다음은 코드입니다. 도움이 되기를 바랍니다.

다음 솔루션에서는 모든 빈 초기화가 완료되면 풀 크기가 더 큰 새 태스크 실행기가 할당됩니다.

@Component
public class PostBeansConstructionListener{

    @EventListener
    public void handleContextRefreshedEvent(ContextRefreshedEvent event){
        final ApplicationContext applicationContext = event.getApplicationContext();
        final SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
        setAsyncTaskExecutor(simpleMessageListenerContainer);
    }

    private void setAsyncTaskExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
        try{
            simpleMessageListenerContainer.setTaskExecutor(getAsyncExecutor());
        }catch(Exception ex){
            throw new RuntimeException("Not able to create Async Task Executor for SimpleMessageListenerContainer.", ex);
        }
    }

    public AsyncTaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
        executor.initialize();
        return executor;
    }
}

언급URL : https://stackoverflow.com/questions/51373082/springboot-sqslistener-not-working-with-exception-taskrejectedexception

반응형