diff --git a/sdk/clientcore/core/src/main/java/io/clientcore/core/http/models/PagedIterable.java b/sdk/clientcore/core/src/main/java/io/clientcore/core/http/models/PagedIterable.java new file mode 100644 index 0000000000000..bfc08528961ba --- /dev/null +++ b/sdk/clientcore/core/src/main/java/io/clientcore/core/http/models/PagedIterable.java @@ -0,0 +1,206 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package io.clientcore.core.http.models; + +import io.clientcore.core.util.ClientLogger; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class provides utility to iterate over {@link PagedResponse} using {@link Stream} and {@link Iterable} + * interfaces. + * + * @param The type of items in the page. + */ +public final class PagedIterable implements Iterable { + + private final Function> pageRetriever; + + /** + * Creates an instance of {@link PagedIterable} that consists of only a single page. This constructor takes a {@code + * Supplier} that return the single page of {@code T}. + * + * @param firstPageRetriever Supplier that retrieves the first page. + */ + public PagedIterable(Supplier> firstPageRetriever) { + this(firstPageRetriever, null); + } + + /** + * Creates an instance of {@link PagedIterable}. The constructor takes a {@code Supplier} and {@code Function}. The + * {@code Supplier} returns the first page of {@code T}, the {@code Function} retrieves subsequent pages of {@code + * T}. + * + * @param firstPageRetriever Supplier that retrieves the first page. + * @param nextPageRetriever Function that retrieves the next page given a continuation token + */ + public PagedIterable(Supplier> firstPageRetriever, + Function> nextPageRetriever) { + this.pageRetriever = (continuationToken) -> (continuationToken == null) + ? firstPageRetriever.get() + : nextPageRetriever.apply(continuationToken); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator iterator() { + return iterableByItemInternal().iterator(); + } + + /** + * Retrieve the {@link Iterable}, one page at a time. It will provide same {@link Iterable} of T values from + * starting if called multiple times. + * + * @return {@link Iterable} of a pages + */ + public Iterable> iterableByPage() { + return iterableByPageInternal(); + } + + /** + * Utility function to provide {@link Stream} of value {@code T}. + * + * @return {@link Stream} of value {@code T}. + */ + public Stream stream() { + return StreamSupport.stream(iterableByItemInternal().spliterator(), false); + } + + /** + * Retrieve the {@link Stream}, one page at a time. It will provide same {@link Stream} of T values from starting if + * called multiple times. + * + * @return {@link Stream} of a pages + */ + public Stream> streamByPage() { + return StreamSupport.stream(iterableByPage().spliterator(), false); + } + + private Iterable iterableByItemInternal() { + return () -> new PagedByIterator<>(pageRetriever) { + + private final Queue> pages = new ConcurrentLinkedQueue<>(); + private volatile Iterator currentPage; + + @Override + boolean needToRequestPage() { + return (currentPage == null || !currentPage.hasNext()) && pages.peek() == null; + } + + @Override + boolean isNextAvailable() { + return (currentPage != null && currentPage.hasNext()) || pages.peek() != null; + } + + @Override + T getNext() { + if ((currentPage == null || !currentPage.hasNext()) && pages.peek() != null) { + currentPage = pages.poll(); + } + + return currentPage.next(); + } + + @Override + void addPage(PagedResponse page) { + Iterator pageValues = page.getValue().iterator(); + if (pageValues.hasNext()) { + this.pages.add(pageValues); + } + } + }; + } + + private Iterable> iterableByPageInternal() { + return () -> new PagedByIterator>(pageRetriever) { + + private final Queue> pages = new ConcurrentLinkedQueue<>(); + + @Override + boolean needToRequestPage() { + return pages.peek() == null; + } + + @Override + boolean isNextAvailable() { + return pages.peek() != null; + } + + @Override + PagedResponse getNext() { + return pages.poll(); + } + + @Override + void addPage(PagedResponse page) { + this.pages.add(page); + } + }; + } + + private abstract static class PagedByIterator implements Iterator { + private static final ClientLogger LOGGER = new ClientLogger(PagedByIterator.class); + + private final Function> pageRetriever; + private volatile String continuationToken; + private volatile boolean done; + + PagedByIterator(Function> pageRetriever) { + this.pageRetriever = pageRetriever; + } + + @Override + public E next() { + if (!hasNext()) { + throw LOGGER.logThrowableAsError(new NoSuchElementException("Iterator contains no more elements.")); + } + + return getNext(); + } + + @Override + public boolean hasNext() { + // Request next pages in a loop in case we are returned empty pages for the by item implementation. + while (!done && needToRequestPage()) { + requestPage(); + } + + return isNextAvailable(); + } + + abstract boolean needToRequestPage(); + + abstract boolean isNextAvailable(); + + abstract E getNext(); + + synchronized void requestPage() { + AtomicBoolean receivedPages = new AtomicBoolean(false); + PagedResponse page = pageRetriever.apply(continuationToken); + if (page != null) { + receivePage(receivedPages, page); + } + } + + abstract void addPage(PagedResponse page); + + private void receivePage(AtomicBoolean receivedPages, PagedResponse page) { + receivedPages.set(true); + addPage(page); + + continuationToken = page.getNextLink(); + this.done = continuationToken == null || continuationToken.isEmpty(); + } + } +} diff --git a/sdk/clientcore/core/src/main/java/io/clientcore/core/http/models/PagedResponse.java b/sdk/clientcore/core/src/main/java/io/clientcore/core/http/models/PagedResponse.java new file mode 100644 index 0000000000000..634547ef258f2 --- /dev/null +++ b/sdk/clientcore/core/src/main/java/io/clientcore/core/http/models/PagedResponse.java @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package io.clientcore.core.http.models; + +import io.clientcore.core.util.binarydata.BinaryData; + +import java.io.IOException; +import java.util.List; + +/** + * Response of a REST API that returns page. + * + * @see Response + * + * @param The type of items in the page. + */ +public final class PagedResponse implements Response> { + + private final HttpRequest request; + private final int statusCode; + private final HttpHeaders headers; + private final List items; + private final String nextLink; + private final BinaryData body; + + /** + * Creates a new instance of the PagedResponse type. + * + * @param request The HttpRequest that was sent to the service whose response resulted in this response. + * @param statusCode The status code from the response. + * @param headers The headers from the response. + * @param body The body from the response. + * @param items The items returned from the service within the response. + * @param nextLink The next page reference returned from the service, to enable future requests to pick up + * from the same place in the paged iteration. + */ + public PagedResponse(HttpRequest request, int statusCode, HttpHeaders headers, BinaryData body, List items, + String nextLink) { + this.request = request; + this.statusCode = statusCode; + this.headers = headers; + this.body = body; + this.items = items; + this.nextLink = nextLink; + } + + /** + * Gets the reference to the next page. + * + * @return The next page reference or null if there isn't a next page. + */ + public String getNextLink() { + return nextLink; + } + + // TODO + // public String getContinuationToken() {} + // public String getPreviousLink() {} + // public String getFirstLink() {} + // public String getLastLink() {} + + /** + * {@inheritDoc} + */ + @Override + public int getStatusCode() { + return statusCode; + } + + /** + * {@inheritDoc} + */ + @Override + public HttpHeaders getHeaders() { + return headers; + } + + /** + * {@inheritDoc} + */ + @Override + public HttpRequest getRequest() { + return request; + } + + /** + * {@inheritDoc} + */ + @Override + public List getValue() { + return items; + } + + /** + * {@inheritDoc} + */ + @Override + public BinaryData getBody() { + return body; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + if (body != null) { + body.close(); + } + } +}