Skip to content

Commit

Permalink
Merge pull request data-integrations#159 from cloudsufi/patch/reduce-mem
Browse files Browse the repository at this point in the history
[PLUGIN-1757] Remove Page iteration from transform
  • Loading branch information
vikasrathee-cs authored Mar 7, 2024
2 parents 1bd1512 + 6dcec1b commit 557bea0
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 29 deletions.
41 changes: 19 additions & 22 deletions src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(HttpBatchSource.NAME)
@Description("Read data from HTTP endpoint.")
public class HttpBatchSource extends BatchSource<NullWritable, BasePage, StructuredRecord> {
public class HttpBatchSource extends BatchSource<NullWritable, PageEntry, StructuredRecord> {
static final String NAME = "HTTP";

private static final Logger LOG = LoggerFactory.getLogger(HttpBatchSource.class);
Expand Down Expand Up @@ -103,28 +103,25 @@ public void initialize(BatchRuntimeContext context) throws Exception {
}

@Override
public void transform(KeyValue<NullWritable, BasePage> input, Emitter<StructuredRecord> emitter) {
BasePage page = input.getValue();
while (page.hasNext()) {
PageEntry pageEntry = page.next();
public void transform(KeyValue<NullWritable, PageEntry> input, Emitter<StructuredRecord> emitter) {
PageEntry pageEntry = input.getValue();

if (!pageEntry.isError()) {
emitter.emit(pageEntry.getRecord());
} else {
InvalidEntry<StructuredRecord> invalidEntry = pageEntry.getError();
switch (pageEntry.getErrorHandling()) {
case SKIP:
LOG.warn(invalidEntry.getErrorMsg());
break;
case SEND:
emitter.emitError(invalidEntry);
break;
case STOP:
throw new RuntimeException(invalidEntry.getErrorMsg());
default:
throw new UnexpectedFormatException(
String.format("Unknown error handling strategy '%s'", config.getErrorHandling()));
}
if (!pageEntry.isError()) {
emitter.emit(pageEntry.getRecord());
} else {
InvalidEntry<StructuredRecord> invalidEntry = pageEntry.getError();
switch (pageEntry.getErrorHandling()) {
case SKIP:
LOG.warn(invalidEntry.getErrorMsg());
break;
case SEND:
emitter.emitError(invalidEntry);
break;
case STOP:
throw new RuntimeException(invalidEntry.getErrorMsg());
default:
throw new UnexpectedFormatException(
String.format("Unknown error handling strategy '%s'", config.getErrorHandling()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.batch;

import io.cdap.plugin.http.common.pagination.page.BasePage;
import io.cdap.plugin.http.common.pagination.page.PageEntry;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -37,7 +38,7 @@ public List<InputSplit> getSplits(JobContext jobContext) {
}

@Override
public RecordReader<NullWritable, BasePage> createRecordReader(
public RecordReader<NullWritable, PageEntry> createRecordReader(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
return new HttpRecordReader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cdap.plugin.http.common.pagination.BaseHttpPaginationIterator;
import io.cdap.plugin.http.common.pagination.PaginationIteratorFactory;
import io.cdap.plugin.http.common.pagination.page.BasePage;
import io.cdap.plugin.http.common.pagination.page.PageEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -34,12 +35,13 @@
* RecordReader implementation, which reads text records representations and http codes
* using {@link BaseHttpPaginationIterator} subclasses.
*/
public class HttpRecordReader extends RecordReader<NullWritable, BasePage> {
public class HttpRecordReader extends RecordReader<NullWritable, PageEntry> {
private static final Logger LOG = LoggerFactory.getLogger(HttpRecordReader.class);
private static final Gson gson = new GsonBuilder().create();

private BaseHttpPaginationIterator httpPaginationIterator;
private BasePage value;
private BasePage currentBasePage;
private PageEntry value;

/**
* Initialize an iterator and config.
Expand All @@ -57,10 +59,22 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont

@Override
public boolean nextKeyValue() {
if (!httpPaginationIterator.hasNext()) {
return false;
// If there is no current page or no next line in the current page
if (currentBasePage == null || !currentBasePage.hasNext()) {
if (!httpPaginationIterator.hasNext()) {
// If there is no next page, return false
// All pages are read
return false;
}
// Get the next page
currentBasePage = httpPaginationIterator.next();
// Check if the new page has any lines
if (!currentBasePage.hasNext()) {
// If the new page has no lines, we stop.
return false;
}
}
value = httpPaginationIterator.next();
value = currentBasePage.next();
return true;
}

Expand All @@ -70,7 +84,7 @@ public NullWritable getCurrentKey() {
}

@Override
public BasePage getCurrentValue() {
public PageEntry getCurrentValue() {
return value;
}

Expand Down

0 comments on commit 557bea0

Please sign in to comment.