diff --git a/src/e2e-test/java/io/cdap/plugin/servicenowsink/actions/ServiceNowSinkPropertiesPageActions.java b/src/e2e-test/java/io/cdap/plugin/servicenowsink/actions/ServiceNowSinkPropertiesPageActions.java index b8e618fa..fdb04018 100644 --- a/src/e2e-test/java/io/cdap/plugin/servicenowsink/actions/ServiceNowSinkPropertiesPageActions.java +++ b/src/e2e-test/java/io/cdap/plugin/servicenowsink/actions/ServiceNowSinkPropertiesPageActions.java @@ -50,7 +50,7 @@ public class ServiceNowSinkPropertiesPageActions { private static Gson gson = new Gson(); public static void getRecordFromServiceNowTable(String query, String tableName) - throws OAuthProblemException, OAuthSystemException { + throws OAuthProblemException, OAuthSystemException, IOException { config = new ServiceNowSourceConfig( "", "", "", "", "", System.getenv("SERVICE_NOW_CLIENT_ID"), @@ -58,7 +58,7 @@ public static void getRecordFromServiceNowTable(String query, String tableName) System.getenv("SERVICE_NOW_REST_API_ENDPOINT"), System.getenv("SERVICE_NOW_USERNAME"), System.getenv("SERVICE_NOW_PASSWORD"), - "", "", ""); + "", "", "", null); ServiceNowTableAPIClientImpl tableAPIClient = new ServiceNowTableAPIClientImpl(config.getConnection()); responseFromServiceNowTable = tableAPIClient.getRecordFromServiceNowTable(tableName, query); diff --git a/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java index 4e9f4727..ee4e27a8 100644 --- a/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/tests/hooks/TestSetupHooks.java @@ -58,7 +58,7 @@ public static void initializeServiceNowSourceConfig() { System.getenv("SERVICE_NOW_REST_API_ENDPOINT"), System.getenv("SERVICE_NOW_USERNAME"), System.getenv("SERVICE_NOW_PASSWORD"), - "", "", ""); + "", "", "", null); } @Before(order = 2, value = "@SN_PRODUCT_CATALOG_ITEM") diff --git a/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java b/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java index 08eff62b..2e2dcc63 100644 --- a/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java +++ b/src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java @@ -267,12 +267,22 @@ public SchemaResponse parseSchemaResponse(String responseBody) { */ public Schema fetchTableSchema(String tableName) throws OAuthProblemException, OAuthSystemException, IOException { + return fetchTableSchema(tableName, getAccessToken()); + } + + /** + * Fetches the table schema from ServiceNow + * + * @param tableName ServiceNow table name for which schema is getting fetched + * @param accessToken Access Token to use + * @return schema for given ServiceNow table + */ + public Schema fetchTableSchema(String tableName, String accessToken) throws IOException { ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( this.conf.getRestApiEndpoint(), tableName, true) .setExcludeReferenceLink(true); RestAPIResponse apiResponse; - String accessToken = getAccessToken(); requestBuilder.setAuthHeader(accessToken); apiResponse = executeGet(requestBuilder.build()); SchemaResponse response = parseSchemaResponse(apiResponse.getResponseBody()); @@ -297,13 +307,24 @@ public Schema fetchTableSchema(String tableName) */ public int getTableRecordCount(String tableName) throws OAuthProblemException, OAuthSystemException, IOException { + return getTableRecordCount(tableName, getAccessToken()); + } + + /** + * Get the total number of records in the table + * + * @param tableName ServiceNow table name for which record count is fetched. + * @param accessToken Access Token for the call + * @return the table record count + * @throws IOException + */ + public int getTableRecordCount(String tableName, String accessToken) throws IOException { ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( this.conf.getRestApiEndpoint(), tableName, false) .setExcludeReferenceLink(true) .setDisplayValue(SourceValueType.SHOW_DISPLAY_VALUE) .setLimit(1); RestAPIResponse apiResponse = null; - String accessToken = getAccessToken(); requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT); requestBuilder.setAuthHeader(accessToken); apiResponse = executeGet(requestBuilder.build()); diff --git a/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java b/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java index 0109f894..5f1061c3 100644 --- a/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java +++ b/src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java @@ -89,12 +89,26 @@ public void test(ConnectorContext connectorContext) throws ValidationException { @Override public BrowseDetail browse(ConnectorContext connectorContext, BrowseRequest browseRequest) throws IOException { + ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config); + try { + String accessToken = serviceNowTableAPIClient.getAccessToken(); + return browse(connectorContext, accessToken); + } catch (OAuthSystemException | OAuthProblemException e) { + throw new IOException(e); + } + } + + /** + * Browse Details for the given AccessToken. + */ + public BrowseDetail browse(ConnectorContext connectorContext, + String accessToken) throws IOException { int count = 0; FailureCollector collector = connectorContext.getFailureCollector(); config.validateCredentialsFields(collector); collector.getOrThrowException(); BrowseDetail.Builder browseDetailBuilder = BrowseDetail.builder(); - Table[] table = listTables().getResult(); + Table[] table = listTables(accessToken).getResult(); for (int i = 0; i < table.length; i++) { String name = table[i].getName(); String label = table[i].getLabel(); @@ -108,23 +122,16 @@ public BrowseDetail browse(ConnectorContext connectorContext, BrowseRequest brow return browseDetailBuilder.setTotalCount(count).build(); } - /** * @return the list of tables. */ - private TableList listTables() throws IOException { + private TableList listTables(String accessToken) throws IOException { ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder( config.getRestApiEndpoint(), OBJECT_TABLE_LIST, false); - String accessToken = null; - ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config); - try { - accessToken = serviceNowTableAPIClient.getAccessToken(); - } catch (OAuthSystemException | OAuthProblemException e) { - throw new IOException(e); - } requestBuilder.setAuthHeader(accessToken); requestBuilder.setAcceptHeader(MediaType.APPLICATION_JSON); requestBuilder.setContentTypeHeader(MediaType.APPLICATION_JSON); + ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config); RestAPIResponse apiResponse = serviceNowTableAPIClient.executeGet(requestBuilder.build()); return GSON.fromJson(apiResponse.getResponseBody(), TableList.class); } diff --git a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReader.java b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReader.java index c4315031..16459a69 100644 --- a/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReader.java +++ b/src/main/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReader.java @@ -20,15 +20,12 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl; import io.cdap.plugin.servicenow.connector.ServiceNowRecordConverter; -import io.cdap.plugin.servicenow.util.ServiceNowConstants; +import io.cdap.plugin.servicenow.util.ServiceNowTableInfo; import io.cdap.plugin.servicenow.util.SourceQueryMode; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.oltu.oauth2.common.exception.OAuthProblemException; -import org.apache.oltu.oauth2.common.exception.OAuthSystemException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -48,23 +45,20 @@ public ServiceNowRecordReader(ServiceNowSourceConfig pluginConf) { @Override public void initialize(InputSplit split, TaskAttemptContext context) { - this.split = (ServiceNowInputSplit) split; - this.pos = 0; - restApi = new ServiceNowTableAPIClientImpl(pluginConf.getConnection()); - tableName = ((ServiceNowInputSplit) split).getTableName(); - tableNameField = pluginConf.getTableNameField(); - fetchSchema(restApi); + initialize(split); + fetchAndInitializeSchema(new ServiceNowJobConfiguration(context.getConfiguration()).getTableInfos(), tableName); } /** - * Initialize with only the provided split. + * Initialize with only the provided split and given schema * This method should not be called directly from the code, * as Hadoop runtime initialize internally during execution. * * @param split Split to read by the current reader. */ - public void initialize(InputSplit split) { - initialize(split, null); + public void initialize(InputSplit split, Schema schema) { + initialize(split); + initializeSchema(tableName, schema); } @Override @@ -118,20 +112,36 @@ private void fetchData() throws IOException { iterator = results.iterator(); } - private void fetchSchema(ServiceNowTableAPIClientImpl restApi) { - try { - Schema tempSchema = restApi.fetchTableSchema(tableName); - tableFields = tempSchema.getFields(); - List schemaFields = new ArrayList<>(tableFields); + protected void initialize(InputSplit split) { + this.split = (ServiceNowInputSplit) split; + this.pos = 0; + restApi = new ServiceNowTableAPIClientImpl(pluginConf.getConnection()); + tableName = ((ServiceNowInputSplit) split).getTableName(); + tableNameField = pluginConf.getTableNameField(); + } - if (pluginConf.getQueryMode() == SourceQueryMode.REPORTING) { - schemaFields.add(Schema.Field.of(tableNameField, Schema.of(Schema.Type.STRING))); - } + /** + * Fetches the schema of the given tableName from the tableInfos and initialize schema using it. + * + * @param tableInfos List of TableInfo objects containing TableName, RecordCount and Schema. + * @param tableName Table Name to initialize this reader for. + */ + private void fetchAndInitializeSchema(List tableInfos, String tableName) { + Schema tempSchema = tableInfos.stream() + .filter((tableInfo) -> tableInfo.getTableName().equalsIgnoreCase(tableName)) + .findFirst().get().getSchema(); - schema = Schema.recordOf(tableName, schemaFields); - } catch (OAuthProblemException | OAuthSystemException | IOException e) { - throw new RuntimeException(e); - } + initializeSchema(tableName, tempSchema); } + private void initializeSchema(String tableName, Schema schema) { + tableFields = schema.getFields(); + List schemaFields = new ArrayList<>(tableFields); + + if (pluginConf.getQueryMode() == SourceQueryMode.REPORTING) { + schemaFields.add(Schema.Field.of(tableNameField, Schema.of(Schema.Type.STRING))); + } + + this.schema = Schema.recordOf(tableName, schemaFields); + } } diff --git a/src/test/java/io/cdap/plugin/servicenow/sink/ServiceNowSinkConfigTest.java b/src/test/java/io/cdap/plugin/servicenow/sink/ServiceNowSinkConfigTest.java index a5e516f4..70e359db 100644 --- a/src/test/java/io/cdap/plugin/servicenow/sink/ServiceNowSinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/servicenow/sink/ServiceNowSinkConfigTest.java @@ -318,7 +318,7 @@ public void testValidateSchema() throws Exception { Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); PowerMockito.when(RestAPIResponse.parse(httpResponse, null)).thenReturn(response); Mockito.when(restApi.executeGet(Mockito.any(RestAPIRequest.class))).thenReturn(restAPIResponse); - Mockito.when(restApi.fetchTableSchema(Mockito.anyString(), Mockito.any())).thenReturn(schema); + Mockito.when(restApi.fetchTableSchema(Mockito.anyString(), Mockito.any(FailureCollector.class))).thenReturn(schema); Mockito.when(restApi.parseSchemaResponse(restAPIResponse.getResponseBody())) .thenReturn(schemaResponse); try { diff --git a/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReaderTest.java b/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReaderTest.java index 1527d593..80567417 100644 --- a/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReaderTest.java +++ b/src/test/java/io/cdap/plugin/servicenow/source/ServiceNowRecordReaderTest.java @@ -187,7 +187,7 @@ public void testFetchData() throws Exception { serviceNowSourceConfig.getPageSize())).thenReturn(results); Mockito.when(restApi.fetchTableSchema(tableName)) .thenReturn(Schema.recordOf(Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); - serviceNowRecordReader.initialize(split, null); + serviceNowRecordReader.initialize(split); Assert.assertTrue(serviceNowRecordReader.nextKeyValue()); } @@ -241,7 +241,7 @@ public void testFetchDataReportingMode() throws Exception { serviceNowSourceConfig.getPageSize())).thenReturn(results); Mockito.when(restApi.fetchTableSchema(tableName)) .thenReturn(Schema.recordOf(Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); - serviceNowRecordReader.initialize(split, null); + serviceNowRecordReader.initialize(split); Assert.assertTrue(serviceNowRecordReader.nextKeyValue()); } @@ -276,7 +276,7 @@ public void testFetchDataOnInvalidTable() throws Exception { response.setResult(results); Mockito.when(restApi.fetchTableSchema(tableName)) .thenReturn(Schema.recordOf(Schema.Field.of("calendar_integration", Schema.of(Schema.Type.STRING)))); - serviceNowRecordReader.initialize(split, null); + serviceNowRecordReader.initialize(split); Assert.assertFalse(serviceNowRecordReader.nextKeyValue()); } }