Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous retries support #81

Closed
zeldigas opened this issue Jun 7, 2017 · 9 comments
Closed

Asynchronous retries support #81

zeldigas opened this issue Jun 7, 2017 · 9 comments

Comments

@zeldigas
Copy link

zeldigas commented Jun 7, 2017

Hello

Is there any plans to support asynchronous operations? We are using SpringRetry but it's not applicable to this case at all, due to blocking nature. Would be really great to have something like

//service
CompletableFuture<Data> loadUnreliableData()

//retry block
AsyncRetryOperations retry = <magic here>
retry.execute(service::loadUnrealiableData).thenApply(data -> println("At last data is loaded!"))

I found some project targeting this problem: https://github.com/nurkiewicz/async-retry, but would be great to have this in spring retry to avoid working with multiple similiar libraries.

@mguilherme
Copy link

Hi,
It might not be what you wish but I've done something similar, you can use submit() instead of execute to return Future<?>, I've only used execute in this small example:

@Slf4j
@EnableRetry
@SpringBootApplication
public class SpringRetry {

    public static void main(String[] args) {
        SpringApplication.run(SpringRetry.class, args);
    }

    @Bean
    public AsyncTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);

        return executor;
    }

    @Bean
    public RetryTemplate retryTemplate() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(5);

        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1500); // 1.5 seconds

        RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(retryPolicy);
        template.setBackOffPolicy(backOffPolicy);
        template.registerListener(new CustomRetryListener());

        return template;
    }

    @Bean
    public CommandLineRunner commandLineRunner(RetryTemplate retryTemplate, OtherService otherService) {
        return args -> {

            asyncExecutor().execute(() -> {
                retryTemplate.execute(context -> {
                    otherService.otherMethod();
                    return null;
                }, context -> {
                    log.error("Recovering from {}", context.getLastThrowable().getMessage());
                    return null;
                });

            });

        };
    }

    @Service
    public class OtherService {

        public void otherMethod() {
            log.info("Executing other method");
            throw new RuntimeException("other runtime exception");
        }
    }

    public class CustomRetryListener extends RetryListenerSupport {

        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            log.warn("Retrying...");
            super.onError(context, callback, throwable);
        }
    }

}

@adelinor
Copy link

adelinor commented Aug 22, 2017

Hi,

The question raised in this issue is how to handle a retry when the service method is itself asynchronous:

public class OtherService {

    public Future<Void> otherMethod() {
        // Does an http request call ...
    }
}

@dsyer
Copy link
Member

dsyer commented Dec 12, 2017

Duplicates #25 and #92. See also #31.

@whpgit
Copy link

whpgit commented Jun 8, 2023

This is my design idea, maybe there aren't many things.

Design concept:

  1. Need Thread pool to submit tasks

  2. Retrying callback interface requires developing incoming

  3. In terms of performance, the pressure on the Thread pool needs to be considered

  4. Monitoring

  5. Stacking cleaning

  6. Retrying the disaster brought by the storm

This time, thread pool and callback are implemented, and the code is as follows:
`package com.example.myretry;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@component
@slf4j
public class AsyncRetryDemo {

@Resource
@Qualifier("retryThreadPoolExecutor")
ThreadPoolExecutor retryThreadPoolExecutor;


public final static int DEFAULT_MAX_ATTEMPTS = 3;


@Bean
public ThreadPoolExecutor retryThreadPoolExecutor() {
    LinkedBlockingQueue<Runnable> asyncRetryThreadPoolQueue = new LinkedBlockingQueue<>(50000);

    return new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            asyncRetryThreadPoolQueue,
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "AsyncRetryExecutor_" + this.threadIndex.incrementAndGet());
                }
            });
}


public void execute(SendCallback<String> sendCallback) {
    retryThreadPoolExecutor.execute(() -> {
        retry(sendCallback);
    });
}


public void retry(SendCallback<String> sendCallback) {
    int curr = 1;
    while (canRetry(curr)) {
        curr++;
        try {
            int i = 1 / 0;
        } catch (Exception e) {
            log.error("retry exception ", e);
            if (isExhausted(curr)) {
                sendCallback.onException(e);
            }
            continue;
        }

        RetryResult<String> successResult = new RetryResult<>();
        successResult.setData("success");
        sendCallback.onSuccess(successResult);
        break;
    }


}

private boolean canRetry(int curr) {
    return DEFAULT_MAX_ATTEMPTS >= curr;
}

private boolean isExhausted(int curr) {
    return DEFAULT_MAX_ATTEMPTS <= curr;
}


public interface SendCallback<T> {
    void onSuccess(final RetryResult<T> sendResult);
    void onException(final Throwable e);
}

public static class RetryResult<T> {

    private T data;

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }
}

}
`

@artembilan
Copy link
Member

@whpgit ,

what you are showing is not a async retry. It is just task scheduling and then you do regular blocking retries in that task.

I my opinion the true async retry when we reschedule the task back to an executor when we canRetry.

But that's fully different story and probably could be implemented with CompletableFuture API.

@whpgit
Copy link

whpgit commented Jun 8, 2023

Haha, yes, I have time to study how CompleteFuture can be applied to this asynchronous business scenario

@whpgit
Copy link

whpgit commented Jun 9, 2023

Hello boss, I have roughly implemented the CompleteFuture API. Here is my code:

package com.example.myretry;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

@Component
@Slf4j
public class CompletableFutureRetryDemo {



    @Resource
    @Qualifier("poolExecutor")
    ThreadPoolExecutor poolExecutor;
    public final static int DEFAULT_MAX_ATTEMPTS = 3;



    public <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> supplier, ThreadPoolExecutor executor) {
        return CompletableFuture.supplyAsync(supplier, executor)
                .handle((result, throwable) -> {
                    if (throwable != null && DEFAULT_MAX_ATTEMPTS > 0) {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        return retry(supplier);
                    }
                    if (throwable != null) {
                        throw new RuntimeException(throwable);
                    }
                    return result;
                })
                .thenCompose(Function.identity());
    }


    public <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> supplier) {
        return retry(supplier, poolExecutor);
    }

    @Bean("poolExecutor")
    public ThreadPoolExecutor poolExecutor() {
        LinkedBlockingQueue<Runnable> asyncRetryThreadPoolQueue = new LinkedBlockingQueue<>(50000);

        return new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                asyncRetryThreadPoolQueue,
                new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "poolExecutor_" + this.threadIndex.incrementAndGet());
                    }
                });
    }



}

@artembilan
Copy link
Member

@whpgit ,

Sorry, I'm not sure why do you share that code with us?
Doesn't look like it has anything to do with this project.
What is your idea, please?
What do we need to take from your post as a framework contribution or fix?

Thanks for understanding!

@whpgit
Copy link

whpgit commented Jun 12, 2023

This code is for your reference. If you use the CompleteFuture API to achieve asynchronous retry, is this correct?

I hope you can give me some suggestions.

Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants