From c38c90144b69204e82235d4a920382c7fefc2dbe Mon Sep 17 00:00:00 2001 From: "vitaly.terentyev" Date: Wed, 22 Jun 2022 17:22:16 +0400 Subject: [PATCH 1/3] Add HasOffset interface. Add ability to start HubspotReceiver from given start offset. --- .../hubspot/common/HubspotPagesIterator.java | 12 +++++- .../hubspot/source/streaming/HasOffset.java | 29 +++++++++++++++ .../source/streaming/HubspotReceiver.java | 37 +++++++++++++++---- 3 files changed, 69 insertions(+), 9 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java diff --git a/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java b/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java index 696260f..7e6a334 100644 --- a/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java +++ b/src/main/java/io/cdap/plugin/hubspot/common/HubspotPagesIterator.java @@ -28,7 +28,7 @@ public class HubspotPagesIterator implements Iterator { private HubspotPage currentPage; private Iterator currentPageIterator; private int iteratorPosition = 0; - private String currentPageOffset = null; + private String currentPageOffset; /** * Constructor for HubspotPagesIterator object. @@ -43,8 +43,12 @@ public HubspotPagesIterator(SourceHubspotConfig config, HubspotPage currentPage, this.currentPageOffset = currentPageOffset; } + public HubspotPagesIterator(SourceHubspotConfig config, String offset) throws IOException { + this(config, new HubspotHelper().getHubspotPage(config, offset), offset); + } + public HubspotPagesIterator(SourceHubspotConfig config) throws IOException { - this(config, new HubspotHelper().getHubspotPage(config, null), null); + this(config, null); } /** @@ -91,6 +95,10 @@ public int getIteratorPosition() { return iteratorPosition; } + public HubspotPage getCurrentPage() { + return currentPage; + } + /** * Here, just set the position of iteration. * @param iteratorPosition the iterator position diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java new file mode 100644 index 0000000..6d4931f --- /dev/null +++ b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java @@ -0,0 +1,29 @@ +/* + * Copyright 2019 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.hubspot.source.streaming; + +import org.apache.spark.streaming.receiver.Receiver; + +/** Interface for any Spark {@link Receiver} that supports reading from and to some offset. */ +public interface HasOffset { + + /** @param offset inclusive start offset from which the reading should be started. */ + void setStartOffset(Long offset); + + /** @return exclusive end offset to which the reading from current page will occur. */ + Long getEndOffset(); +} diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java index b5bb661..8804894 100644 --- a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java +++ b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java @@ -33,10 +33,12 @@ /** * Implementation of Spark receiver to receive Salesforce push topic events. */ -public class HubspotReceiver extends Receiver { +public class HubspotReceiver extends Receiver implements HasOffset { private static final Logger LOG = LoggerFactory.getLogger(HubspotReceiver.class); private static final String RECEIVER_THREAD_NAME = "hubspot_api_listener"; private final HubspotStreamingSourceConfig config; + private String startOffset = null; + private Long endOffset = Long.MAX_VALUE; HubspotReceiver(HubspotStreamingSourceConfig config) throws IOException { super(StorageLevel.MEMORY_AND_DISK_2()); @@ -44,10 +46,23 @@ public class HubspotReceiver extends Receiver { } @Override + public void setStartOffset(Long startOffset) { + if (startOffset != null) { + //startOffset - 1, because offset should be inclusive + this.startOffset = String.valueOf(startOffset == 0L ? 0 : startOffset - 1); + } + } + + public HubspotStreamingSourceConfig getConfig() { + return config; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") public void onStart() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() - .setNameFormat(RECEIVER_THREAD_NAME + "-%d") - .build(); + .setNameFormat(RECEIVER_THREAD_NAME + "-%d") + .build(); Executors.newSingleThreadExecutor(namedThreadFactory).submit(this::receive); } @@ -58,13 +73,21 @@ public void onStop() { // is designed to stop by itself if isStopped() returns false } + @Override + public Long getEndOffset() { + return endOffset; + } + private void receive() { try { - HubspotPagesIterator hubspotPagesIterator = new HubspotPagesIterator(config); + HubspotPagesIterator hubspotPagesIterator = new HubspotPagesIterator(config, startOffset); while (!isStopped()) { + this.endOffset = Long.parseLong(hubspotPagesIterator.getCurrentPage().getOffset()); if (hubspotPagesIterator.hasNext()) { - store(hubspotPagesIterator.next().toString()); + if (!isStopped()) { + store(hubspotPagesIterator.next().toString()); + } } else { Integer minutesToSleep = config.getPullFrequency().getMinutesValue(); LOG.debug(String.format("Waiting for '%d' minutes to pull.", minutesToSleep)); @@ -72,11 +95,11 @@ private void receive() { // reload current page HubspotPage currentPage = new HubspotHelper().getHubspotPage(config, - hubspotPagesIterator.getCurrentPageOffset()); + hubspotPagesIterator.getCurrentPageOffset()); int iteratorPosition = hubspotPagesIterator.getIteratorPosition(); hubspotPagesIterator = new HubspotPagesIterator(config, currentPage, - hubspotPagesIterator.getCurrentPageOffset()); + hubspotPagesIterator.getCurrentPageOffset()); hubspotPagesIterator.setIteratorPosition(iteratorPosition); } } From c7869a48ae795acc36b20e25012b87beb5aa6899 Mon Sep 17 00:00:00 2001 From: akashorabek Date: Thu, 1 Sep 2022 17:25:29 +0600 Subject: [PATCH 2/3] Replaced the local Hashoffset interface with the Apache Beam SparkReceiverIO interface --- pom.xml | 5 ++++ .../hubspot/source/streaming/HasOffset.java | 29 ------------------- .../source/streaming/HubspotReceiver.java | 1 + 3 files changed, 6 insertions(+), 29 deletions(-) delete mode 100644 src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java diff --git a/pom.xml b/pom.xml index de0bcc9..912e947 100644 --- a/pom.xml +++ b/pom.xml @@ -399,6 +399,11 @@ ${wiremock.version} test + + org.apache.beam + beam-sdks-java-io-sparkreceiver + 2.41.0 + diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java deleted file mode 100644 index 6d4931f..0000000 --- a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HasOffset.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2019 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.plugin.hubspot.source.streaming; - -import org.apache.spark.streaming.receiver.Receiver; - -/** Interface for any Spark {@link Receiver} that supports reading from and to some offset. */ -public interface HasOffset { - - /** @param offset inclusive start offset from which the reading should be started. */ - void setStartOffset(Long offset); - - /** @return exclusive end offset to which the reading from current page will occur. */ - Long getEndOffset(); -} diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java index 8804894..951e3fa 100644 --- a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java +++ b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java @@ -20,6 +20,7 @@ import io.cdap.plugin.hubspot.common.HubspotHelper; import io.cdap.plugin.hubspot.common.HubspotPage; import io.cdap.plugin.hubspot.common.HubspotPagesIterator; +import org.apache.beam.sdk.io.sparkreceiver.HasOffset; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; import org.slf4j.Logger; From d3fc6673f83e72c4137c3b4b3511edb62660ff7f Mon Sep 17 00:00:00 2001 From: akashorabek Date: Wed, 21 Sep 2022 13:57:38 +0600 Subject: [PATCH 3/3] Format comments --- .../cdap/plugin/hubspot/source/streaming/HubspotReceiver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java index 951e3fa..480cb2b 100644 --- a/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java +++ b/src/main/java/io/cdap/plugin/hubspot/source/streaming/HubspotReceiver.java @@ -49,7 +49,7 @@ public class HubspotReceiver extends Receiver implements HasOffset { @Override public void setStartOffset(Long startOffset) { if (startOffset != null) { - //startOffset - 1, because offset should be inclusive + // startOffset - 1, because offset should be inclusive. this.startOffset = String.valueOf(startOffset == 0L ? 0 : startOffset - 1); } } @@ -106,7 +106,7 @@ private void receive() { } } catch (Exception e) { String errorMessage = "Exception while receiving messages from hubspot"; - /* TO DO https://issues.cask.co/browse/PLUGIN-357 + /* TODO https://issues.cask.co/browse/PLUGIN-357 The receiver will get terminated on error and stop receiving messages. Retry Logic needs to be implemented. */