From 10cbe206ab56d7f68917d36926af2be2915ff366 Mon Sep 17 00:00:00 2001 From: anouri Date: Thu, 23 Apr 2020 17:02:21 +0200 Subject: [PATCH] support all CP4D connection types #11 --- package/docs/source/conf.py | 2 +- package/streamsx/hdfs/__init__.py | 2 +- package/streamsx/hdfs/_hdfs.py | 63 +++++++------------------------ 3 files changed, 16 insertions(+), 51 deletions(-) diff --git a/package/docs/source/conf.py b/package/docs/source/conf.py index 0ad8a9b..07b88af 100644 --- a/package/docs/source/conf.py +++ b/package/docs/source/conf.py @@ -67,7 +67,7 @@ # The short X.Y version. version = '1.5' # The full version, including alpha/beta/rc tags. -release = '1.5.0' +release = '1.5.5' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/package/streamsx/hdfs/__init__.py b/package/streamsx/hdfs/__init__.py index d1be31e..0a6e24c 100644 --- a/package/streamsx/hdfs/__init__.py +++ b/package/streamsx/hdfs/__init__.py @@ -65,7 +65,7 @@ """ -__version__='1.5.0' +__version__='1.5.5' __all__ = ['HdfsDirectoryScan', 'HdfsFileSink', 'HdfsFileSource', 'HdfsFileCopy', 'download_toolkit', 'configure_connection', 'scan', 'read', 'write'] from streamsx.hdfs._hdfs import download_toolkit, configure_connection, scan, read, write, copy, HdfsDirectoryScan, HdfsFileSink, HdfsFileSource, HdfsFileCopy diff --git a/package/streamsx/hdfs/_hdfs.py b/package/streamsx/hdfs/_hdfs.py index 9ac09c1..362f5b2 100644 --- a/package/streamsx/hdfs/_hdfs.py +++ b/package/streamsx/hdfs/_hdfs.py @@ -41,60 +41,25 @@ def _add_toolkit_dependency(topo): # This is important when toolkit is not set with streamsx.spl.toolkit.add_toolkit (selecting toolkit from remote build service) streamsx.spl.toolkit.add_toolkit_dependency(topo, _TOOLKIT_NAME, '[5.0.0,6.0.0)') -def _read_service_credentials(credentials): +def _read_service_credentials(connection): hdfs_uri = "" user = "" password = "" - if isinstance(credentials, dict): - # check for Analytics Engine service credentials - if 'cluster' in credentials: - user = credentials.get('cluster').get('user') - password = credentials.get('cluster').get('password') - hdfs_uri = credentials.get('cluster').get('service_endpoints').get('webhdfs') + if isinstance(connection, dict): + type = str(connection.get('type')) + # check the connection type + if ('GenericWebHDFS' in type): + user = connection.get('username') + password = connection.get('password') + host_port = connection.get('host') + ':' + str(connection.get('port')) + hdfs_uri = "webhdfs://" + host_port else: - if 'webhdfs' in credentials: - user = credentials.get('user') - password = credentials.get('password') - hdfs_uri = credentials.get('webhdfs') - else: - raise ValueError(credentials) + host_port = connection.get('host') + ':' + str(connection.get('port')) + hdfs_uri = "hdfs://" + host_port else: - raise TypeError(credentials) - # construct expected format for hdfs_uri: webhdfs://host:port - uri_parsed = urlparse(hdfs_uri) - hdfs_uri = 'webhdfs://'+uri_parsed.netloc - return hdfs_uri, user, password - -def _check_vresion_credentials1(credentials, _op, topology): - if credentials is not None: + raise TypeError(connection) - # check streamsx.hdfs version - _add_toolkit_dependency(topology) - # if credentials is None: - # raise SystemExit("Error: credentials is empty.") - print( credentials) - topology.stream.name - print( '------------' + topology.stream.name + " " + str(credentials)) - if isinstance(credentials, dict): - hdfs_uri, user, password = _read_service_credentials(credentials) - _op.params['hdfsUri'] = hdfs_uri - _op.params['hdfsUser'] = user - _op.params['hdfsPassword'] = password - #check if the credentials is a valid JSON string - elif _is_a_valid_json(credentials): - _op.params['credentials'] = credentials - else: - # expect core-site.xml file in credentials param - try: - print( '--------------------------*******+++---' + str(credentials)) - # with open(str(credentials)): - print(credentials) - topology.add_file_dependency(credentials, 'etc') - _op.params['configPath'] = 'etc' - _op.params['credentials'] = None - except IOError: - raise SystemExit("Error: File not accessible.") - + return hdfs_uri, user, password def _setCredentials(LocalCredentials, topology): @@ -117,7 +82,7 @@ def _setCredentials(LocalCredentials, topology): except IOError: raise ValueError(LocalCredentials) else: - if isinstance(credentials, dict): + if isinstance(LocalCredentials, dict): hdfsUri, hdfsUser, hdfsPassword = _read_service_credentials(LocalCredentials) else: credentials=LocalCredentials