Skip to content

Commit

Permalink
support all CP4D connection types #11
Browse files Browse the repository at this point in the history
  • Loading branch information
anouri committed Apr 23, 2020
1 parent d84b12f commit 10cbe20
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 51 deletions.
2 changes: 1 addition & 1 deletion package/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
# The short X.Y version.
version = '1.5'
# The full version, including alpha/beta/rc tags.
release = '1.5.0'
release = '1.5.5'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion package/streamsx/hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"""

__version__='1.5.0'
__version__='1.5.5'

__all__ = ['HdfsDirectoryScan', 'HdfsFileSink', 'HdfsFileSource', 'HdfsFileCopy', 'download_toolkit', 'configure_connection', 'scan', 'read', 'write']
from streamsx.hdfs._hdfs import download_toolkit, configure_connection, scan, read, write, copy, HdfsDirectoryScan, HdfsFileSink, HdfsFileSource, HdfsFileCopy
63 changes: 14 additions & 49 deletions package/streamsx/hdfs/_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,60 +41,25 @@ def _add_toolkit_dependency(topo):
# This is important when toolkit is not set with streamsx.spl.toolkit.add_toolkit (selecting toolkit from remote build service)
streamsx.spl.toolkit.add_toolkit_dependency(topo, _TOOLKIT_NAME, '[5.0.0,6.0.0)')

def _read_service_credentials(credentials):
def _read_service_credentials(connection):
hdfs_uri = ""
user = ""
password = ""
if isinstance(credentials, dict):
# check for Analytics Engine service credentials
if 'cluster' in credentials:
user = credentials.get('cluster').get('user')
password = credentials.get('cluster').get('password')
hdfs_uri = credentials.get('cluster').get('service_endpoints').get('webhdfs')
if isinstance(connection, dict):
type = str(connection.get('type'))
# check the connection type
if ('GenericWebHDFS' in type):
user = connection.get('username')
password = connection.get('password')
host_port = connection.get('host') + ':' + str(connection.get('port'))
hdfs_uri = "webhdfs://" + host_port
else:
if 'webhdfs' in credentials:
user = credentials.get('user')
password = credentials.get('password')
hdfs_uri = credentials.get('webhdfs')
else:
raise ValueError(credentials)
host_port = connection.get('host') + ':' + str(connection.get('port'))
hdfs_uri = "hdfs://" + host_port
else:
raise TypeError(credentials)
# construct expected format for hdfs_uri: webhdfs://host:port
uri_parsed = urlparse(hdfs_uri)
hdfs_uri = 'webhdfs://'+uri_parsed.netloc
return hdfs_uri, user, password

def _check_vresion_credentials1(credentials, _op, topology):
if credentials is not None:
raise TypeError(connection)

# check streamsx.hdfs version
_add_toolkit_dependency(topology)
# if credentials is None:
# raise SystemExit("Error: credentials is empty.")
print( credentials)
topology.stream.name
print( '------------' + topology.stream.name + " " + str(credentials))
if isinstance(credentials, dict):
hdfs_uri, user, password = _read_service_credentials(credentials)
_op.params['hdfsUri'] = hdfs_uri
_op.params['hdfsUser'] = user
_op.params['hdfsPassword'] = password
#check if the credentials is a valid JSON string
elif _is_a_valid_json(credentials):
_op.params['credentials'] = credentials
else:
# expect core-site.xml file in credentials param
try:
print( '--------------------------*******+++---' + str(credentials))
# with open(str(credentials)):
print(credentials)
topology.add_file_dependency(credentials, 'etc')
_op.params['configPath'] = 'etc'
_op.params['credentials'] = None
except IOError:
raise SystemExit("Error: File not accessible.")

return hdfs_uri, user, password


def _setCredentials(LocalCredentials, topology):
Expand All @@ -117,7 +82,7 @@ def _setCredentials(LocalCredentials, topology):
except IOError:
raise ValueError(LocalCredentials)
else:
if isinstance(credentials, dict):
if isinstance(LocalCredentials, dict):
hdfsUri, hdfsUser, hdfsPassword = _read_service_credentials(LocalCredentials)
else:
credentials=LocalCredentials
Expand Down

0 comments on commit 10cbe20

Please sign in to comment.