From 6dcec1bb09922bf56cb58991eef1e3876cf50dc7 Mon Sep 17 00:00:00 2001 From: psainics Date: Thu, 7 Mar 2024 02:33:00 +0530 Subject: [PATCH] Remove Page iteration from transform --- .../http/source/batch/HttpBatchSource.java | 41 +++++++++---------- .../http/source/batch/HttpInputFormat.java | 3 +- .../http/source/batch/HttpRecordReader.java | 26 +++++++++--- 3 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java index cab22532..9fff34d5 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java @@ -47,7 +47,7 @@ @Plugin(type = BatchSource.PLUGIN_TYPE) @Name(HttpBatchSource.NAME) @Description("Read data from HTTP endpoint.") -public class HttpBatchSource extends BatchSource { +public class HttpBatchSource extends BatchSource { static final String NAME = "HTTP"; private static final Logger LOG = LoggerFactory.getLogger(HttpBatchSource.class); @@ -103,28 +103,25 @@ public void initialize(BatchRuntimeContext context) throws Exception { } @Override - public void transform(KeyValue input, Emitter emitter) { - BasePage page = input.getValue(); - while (page.hasNext()) { - PageEntry pageEntry = page.next(); + public void transform(KeyValue input, Emitter emitter) { + PageEntry pageEntry = input.getValue(); - if (!pageEntry.isError()) { - emitter.emit(pageEntry.getRecord()); - } else { - InvalidEntry invalidEntry = pageEntry.getError(); - switch (pageEntry.getErrorHandling()) { - case SKIP: - LOG.warn(invalidEntry.getErrorMsg()); - break; - case SEND: - emitter.emitError(invalidEntry); - break; - case STOP: - throw new RuntimeException(invalidEntry.getErrorMsg()); - default: - throw new UnexpectedFormatException( - String.format("Unknown error handling strategy '%s'", config.getErrorHandling())); - } + if (!pageEntry.isError()) { + emitter.emit(pageEntry.getRecord()); + } else { + InvalidEntry invalidEntry = pageEntry.getError(); + switch (pageEntry.getErrorHandling()) { + case SKIP: + LOG.warn(invalidEntry.getErrorMsg()); + break; + case SEND: + emitter.emitError(invalidEntry); + break; + case STOP: + throw new RuntimeException(invalidEntry.getErrorMsg()); + default: + throw new UnexpectedFormatException( + String.format("Unknown error handling strategy '%s'", config.getErrorHandling())); } } } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java index c0d82d8f..69cee410 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.batch; import io.cdap.plugin.http.common.pagination.page.BasePage; +import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -37,7 +38,7 @@ public List getSplits(JobContext jobContext) { } @Override - public RecordReader createRecordReader( + public RecordReader createRecordReader( InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { return new HttpRecordReader(); } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java index 1f172d3c..d2669be4 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java @@ -20,6 +20,7 @@ import io.cdap.plugin.http.common.pagination.BaseHttpPaginationIterator; import io.cdap.plugin.http.common.pagination.PaginationIteratorFactory; import io.cdap.plugin.http.common.pagination.page.BasePage; +import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -34,12 +35,13 @@ * RecordReader implementation, which reads text records representations and http codes * using {@link BaseHttpPaginationIterator} subclasses. */ -public class HttpRecordReader extends RecordReader { +public class HttpRecordReader extends RecordReader { private static final Logger LOG = LoggerFactory.getLogger(HttpRecordReader.class); private static final Gson gson = new GsonBuilder().create(); private BaseHttpPaginationIterator httpPaginationIterator; - private BasePage value; + private BasePage currentBasePage; + private PageEntry value; /** * Initialize an iterator and config. @@ -57,10 +59,22 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont @Override public boolean nextKeyValue() { - if (!httpPaginationIterator.hasNext()) { - return false; + // If there is no current page or no next line in the current page + if (currentBasePage == null || !currentBasePage.hasNext()) { + if (!httpPaginationIterator.hasNext()) { + // If there is no next page, return false + // All pages are read + return false; + } + // Get the next page + currentBasePage = httpPaginationIterator.next(); + // Check if the new page has any lines + if (!currentBasePage.hasNext()) { + // If the new page has no lines, we stop. + return false; + } } - value = httpPaginationIterator.next(); + value = currentBasePage.next(); return true; } @@ -70,7 +84,7 @@ public NullWritable getCurrentKey() { } @Override - public BasePage getCurrentValue() { + public PageEntry getCurrentValue() { return value; }