diff --git a/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java b/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java index 334498f..c64ec29 100644 --- a/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java +++ b/src/main/java/io/cdap/plugin/google/common/APIRequestRetryer.java @@ -51,8 +51,8 @@ public abstract class APIRequestRetryer { protected static final int LIMIT_RATE_EXCEEDED_CODE = 403; protected static final int BACKEND_ERROR_CODE = 500; protected static final int SERVICE_UNAVAILABLE_CODE = 503; - private static final int MAX_RETRY_WAIT = 200; - private static final int MAX_RETRY_COUNT = 8; + private static final int MAX_RETRY_WAIT = 300; + private static final int MAX_RETRY_COUNT = 10; private static final int MAX_RETRY_JITTER_WAIT = 100; protected static final String TOO_MANY_REQUESTS_MESSAGE = "Too Many Requests"; protected static final String LIMIT_RATE_EXCEEDED_MESSAGE = "Rate Limit Exceeded"; @@ -76,7 +76,7 @@ public void onRetry(Attempt attempt) { GoogleJsonResponseException e = (GoogleJsonResponseException) exceptionCause; LOG.warn(String.format( "Error code: '%d', message: '%s'. Attempt: '%d'. Delay since first: '%d'. Description: '%s'.", - e.getDetails().getCode(), + getExceptionStatusCode(e), e.getStatusMessage(), attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt(), @@ -125,21 +125,26 @@ private static boolean checkHttpResponseException(Throwable t) { private static boolean isTooManyRequestsError(GoogleJsonResponseException e) { List possibleMessages = Arrays.asList(TOO_MANY_REQUESTS_MESSAGE, LIMIT_RATE_EXCEEDED_MESSAGE); - return e.getDetails().getCode() == TOO_MANY_REQUESTS_CODE && possibleMessages.contains(e.getStatusMessage()); + return getExceptionStatusCode(e) == TOO_MANY_REQUESTS_CODE && possibleMessages.contains( + e.getStatusMessage()); } private static boolean isRateLimitError(GoogleJsonResponseException e) { - return e.getDetails().getCode() == LIMIT_RATE_EXCEEDED_CODE + return getExceptionStatusCode(e) == LIMIT_RATE_EXCEEDED_CODE && (LIMIT_RATE_EXCEEDED_MESSAGE.equals(e.getStatusMessage()) || e.getDetails().getMessage().contains(LIMIT_RATE_EXCEEDED_MESSAGE)); } private static boolean isBackendError(GoogleJsonResponseException e) { - return e.getDetails().getCode() == BACKEND_ERROR_CODE; + return getExceptionStatusCode(e) == BACKEND_ERROR_CODE; } private static boolean isServiceUnavailableError(GoogleJsonResponseException e) { - return e.getDetails().getCode() == SERVICE_UNAVAILABLE_CODE; + return getExceptionStatusCode(e) == SERVICE_UNAVAILABLE_CODE; + } + + private static Integer getExceptionStatusCode(GoogleJsonResponseException e) { + return e.getDetails() != null ? e.getDetails().getCode() : null; } private static boolean isRateLimitError(HttpResponseException e) { diff --git a/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java b/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java index a666d3f..c306e56 100644 --- a/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java +++ b/src/main/java/io/cdap/plugin/google/common/GoogleDriveFilteringClient.java @@ -72,9 +72,10 @@ public List getFilesSummary(List exportedTypes, int filesNum int retrievedFiles = 0; int actualFilesNumber = filesNumber; if (IdentifierType.FILE_IDENTIFIER.equals(config.getIdentifierType())) { - files.add(service.files().get(config.getFileIdentifier()).setSupportsAllDrives(true).execute()); + files.add(getFilesSummaryByFileId()); return files; } + Drive.Files.List request = service.files().list() .setSupportsAllDrives(true) .setIncludeItemsFromAllDrives(true) @@ -99,6 +100,10 @@ public List getFilesSummary(List exportedTypes, int filesNum }); } + protected File getFilesSummaryByFileId() throws IOException, ExecutionException { + return service.files().get(config.getFileIdentifier()).setSupportsAllDrives(true).execute(); + } + private String generateFilter(List exportedTypes) throws InterruptedException { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java new file mode 100644 index 0000000..624b61e --- /dev/null +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsFilteringClient.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.sheets.source; + +import com.google.api.services.drive.model.File; +import io.cdap.plugin.google.common.GoogleDriveFilteringClient; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +/** + * Client for getting File information via Google Sheets API. + */ +public class GoogleSheetsFilteringClient extends GoogleDriveFilteringClient { + + public GoogleSheetsFilteringClient(GoogleSheetsSourceConfig config) throws IOException { + super(config); + } + + @Override + protected File getFilesSummaryByFileId() throws IOException, ExecutionException { + File file = service.files().get(config.getFileIdentifier()).setSupportsAllDrives(true).execute(); + if (!file.getMimeType().equalsIgnoreCase(DRIVE_SPREADSHEETS_MIME)) { + throw new ExecutionException( + String.format("File with id: '%s' has a MIME_TYPE '%s' and is not a Google Sheets File.", + file.getMimeType(), + config.getFileIdentifier()), null); + } + return file; + } +} diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java index 8796f69..5fd1f40 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsInputFormat.java @@ -57,7 +57,7 @@ public List getSplits(JobContext jobContext) throws IOException { GoogleSheetsInputFormatProvider.GSON.fromJson(headersJson, headersType); // get all sheets files according to filter - GoogleDriveFilteringClient driveFilteringClient = new GoogleDriveFilteringClient(googleSheetsSourceConfig); + GoogleDriveFilteringClient driveFilteringClient = new GoogleSheetsFilteringClient(googleSheetsSourceConfig); List spreadsheetsFiles; try { spreadsheetsFiles = driveFilteringClient.getFilesSummary(Collections.singletonList(ExportedType.SPREADSHEETS)); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java index 03939c8..161835d 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java @@ -334,7 +334,7 @@ public ValidationResult validate(FailureCollector collector) { GoogleDriveFilteringClient driveClient; GoogleSheetsSourceClient sheetsSourceClient; try { - driveClient = new GoogleDriveFilteringClient(this); + driveClient = new GoogleSheetsFilteringClient(this); sheetsSourceClient = new GoogleSheetsSourceClient(this); } catch (IOException e) { collector.addFailure("Exception during drive and sheets connections instantiating.", null); diff --git a/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java b/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java index e017988..33bdba9 100644 --- a/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java +++ b/src/main/java/io/cdap/plugin/google/sheets/source/SheetTransformer.java @@ -69,7 +69,8 @@ public static StructuredRecord transform(RowRecord rowRecord, Schema schema, boo builder.set(metadataRecordName, rowRecord.getMetadata()); } else { ComplexSingleValueColumn complexSingleValueColumn = rowRecord.getHeaderedCells().get(name); - if (complexSingleValueColumn.getData() == null && complexSingleValueColumn.getSubColumns().isEmpty()) { + if (complexSingleValueColumn == null || (complexSingleValueColumn.getData() == null + && complexSingleValueColumn.getSubColumns().isEmpty())) { builder.set(name, null); } else { processCellData(builder, field, complexSingleValueColumn);