From c3d64fef7de85d77612bafd637a122c888c11ab0 Mon Sep 17 00:00:00 2001 From: calvin Date: Fri, 27 May 2016 10:16:53 +0800 Subject: [PATCH 1/6] add some comments and test cases --- hive/__init__.py | 7 ++- hive/executor.py | 97 ++++++++++++++++++++++++++---------------- setup.cfg | 3 ++ setup.py | 57 ++++++++++++------------- tests/test_executor.py | 4 ++ 5 files changed, 100 insertions(+), 68 deletions(-) create mode 100644 setup.cfg diff --git a/hive/__init__.py b/hive/__init__.py index 33013b9..bd5ec0b 100644 --- a/hive/__init__.py +++ b/hive/__init__.py @@ -1,6 +1,9 @@ __author__ = 'Hua Jiang' __versioninfo__ = (1, 0, 0) __version__ = '.'.join(map(str, __versioninfo__)) +__title__ = 'hive-executor-py' -__all__ = [ - ] \ No newline at end of file +from .executor import HiveExecutor, CommandResult +from .exceptions import HiveCommandExecuteError +from .exceptions import HiveUnfoundError +from .exceptions import SystemCommandExecuteError diff --git a/hive/executor.py b/hive/executor.py index 91d5681..083f638 100644 --- a/hive/executor.py +++ b/hive/executor.py @@ -2,7 +2,11 @@ # Author: Hua Jiang # Date: 2016-04-23 # Desc: -# +# This module includes two classes below. +# An instance of the `CommandResult` class +# may be looked as a DTO(Data Transfer Object).Some of methods in the `HiveExecutor` +# class return result that is an instance of the `CommandResult` class. +# You can use straight the object of `HiveExecutor` class to operate hive. ###################################### import os @@ -11,14 +15,11 @@ import logging from collections import OrderedDict from hive.exceptions import (HiveUnfoundError, - SystemCommandExecuteError, - HiveCommandExecuteError, - ) + HiveCommandExecuteError) _logger = logging.getLogger(__name__) - class CommandResult(object): """The class will be used to stored result of some methods in HivExecutor. @@ -46,7 +47,7 @@ class HiveExecutor(object): """HiveExecutor may be looked as a wrapper of HiveCLI. You can create an object of HiveExecutor,then execute most of Hive commands - by the class methods.When you instantiate the class,firstly check if hive + by the class methods.When you instantiate the class,firstly check if hive client is available.So you can use it as a substitute for Hive Client. Attributes @@ -73,16 +74,16 @@ class HiveExecutor(object): Examples -------- - >>> import HiveExecutor - >>> hive=HiveExecutor("hive") - >>> databases=hive.show_databases() + >>> from hive import HiveExecutor + >>> client=HiveExecutor("hive") + >>> databases=client.show_databases() >>> print(databases) ['default', 'test'] - >>> databases=hive.show_databases('defau*') + >>> databases=client.show_databases('defau*') >>> print(databases) ['default'] - >>> tables=hive.show_tables('default') + >>> tables=client.show_tables('default') ['table1', 'table2'] """ @@ -127,8 +128,8 @@ def has_table(self, db_name, table_name): return False def show_tables(self, db_name, like_parttern=None): - - if like_parttern: + "substitute for `show tables`" + if like_parttern: like_parttern = "like '%s'" % (like_parttern) else: like_parttern = "" @@ -145,8 +146,8 @@ def show_tables(self, db_name, like_parttern=None): raise HiveCommandExecuteError( "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) - def show_partitions(self, db_name, table_name, search_partitions=None): + "substitute for `show partitions`" explicit_partition = "" if search_partitions: if isinstance(search_partitions, OrderedDict): @@ -161,7 +162,8 @@ def show_partitions(self, db_name, table_name, search_partitions=None): raise ValueError( "The passed argument partition must be OrderedDict type.") - hive_sql = "use %s;show partitions %s %s;" % (db_name, table_name, explicit_partition) + hive_sql = "use %s;show partitions %s %s;" % ( + db_name, table_name, explicit_partition) _logger.debug("the function show_partitions() execute:%s" % (hive_sql)) @@ -176,8 +178,8 @@ def show_partitions(self, db_name, table_name, search_partitions=None): return None - def show_functions(self): + "substitute for `show functions`" hive_sql = "show functions;" _logger.debug("executed hive sql:%s" % (hive_sql)) @@ -199,6 +201,7 @@ def has_function(self, function_name): return False def show_create_table(self, db_name, table_name): + "substitute for `show create table`" hive_sql = "show create table %s.%s;" % (db_name, table_name) _logger.debug("executed hive sql:%s" % (hive_sql)) @@ -212,6 +215,7 @@ def show_create_table(self, db_name, table_name): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) def desc_table(self, db_name, table_name, extended=False): + "substitute for `desc table_name`" hive_sql = "desc %s.%s;" % (db_name, table_name) _logger.debug("executed hive sql:%s" % (hive_sql)) @@ -241,17 +245,16 @@ def _parse_table(self, text): continue line = line.strip() - if len(line)==0: + if len(line) == 0: continue - field_info = {} - elements=None - m=re.match('(\S+)\s+(\S+)\s+(.*)',line) + elements = None + m = re.match('(\S+)\s+(\S+)\s+(.*)', line) if m is None: - m=re.match('(\S+)\s+(\S+)',line) + m = re.match('(\S+)\s+(\S+)', line) if m: - elements=m.groups() + elements = m.groups() if elements: if partition_info_flag: @@ -260,13 +263,14 @@ def _parse_table(self, text): fields_part_info.append(elements) return table_info - def show_databases(self,like_parttern=None): + def show_databases(self, like_parttern=None): + "substitute for `show databases`" if like_parttern: like_parttern = "like '%s'" % (like_parttern) else: like_parttern = "" - hive_sql="show databases %s;" %(like_parttern) + hive_sql = "show databases %s;" % (like_parttern) _logger.debug("executed hive sql:%s" % (hive_sql)) cr = self.execute(sql=hive_sql) @@ -278,9 +282,10 @@ def show_databases(self,like_parttern=None): raise HiveCommandExecuteError( "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) - def desc_database(self, db_name, extended=False): - hive_sql = "set hive.cli.print.header=true;desc database %s;" % (db_name) + "substitute for `desc database`" + hive_sql = "set hive.cli.print.header=true;desc database %s;" % ( + db_name) _logger.debug("executed hive sql:%s" % (hive_sql)) cr = self.execute(sql=hive_sql) @@ -292,18 +297,20 @@ def desc_database(self, db_name, extended=False): raise HiveCommandExecuteError( "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) - - def _parse_database(self,text): + def _parse_database(self, text): + "unsupported" return text - def desc_function(self, db_name, table_name, extended=False): + "unsupported" pass - def desc_formatted_table(self,db_name,table_name): + def desc_formatted_table(self, db_name, table_name): + "unsupported" pass def show_roles(self): + "substitute for `show roles`" hive_sql = "show roles;" _logger.debug("executed hive sql:%s" % (hive_sql)) @@ -317,9 +324,11 @@ def show_roles(self): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) def drop_table(self, db_name, table_name): + "unsupported" pass def create_table(self, create_table_sql): + "unsupported" pass def _parse_partitions(self, text): @@ -356,14 +365,28 @@ def _execute_system_command(self, command): return CommandResult(output, err, status) def execute(self, variable_substitution=None, init_sql_file=None, sql_file=None, sql=None, output_file=None): + """this method can be used to execute hive cliet commands. + + Parameters + ---------- + variable_substitution : Optional[str] + The parameter that contains key-value pairs is a dict type variant. + init_sql_file : Optional[str] + The path of the initialization sql file + sql_file : Optional[str] + The path of the hive sql + sql : Optional[str] + The hive sql,if this parameter is required,the parameter of the sql_file will be disable. + output_file : Optional[str] + When passed the parameter 'sql',this parameter can be used. + + Raises + ------ + ValueError + The parameters violate the rules below. + """ - Parameters: - variable_substitution:The parameter that contains key-value pairs is a dict type variant. - init_sql_file:the path of the initialization sql file - sql_file:the path of the hive sql - sql:the hive sql,if this parameter is required,the parameter of the sql_file will be disable. - output_file:when passed the parameter 'sql',this parameter can be used. - """ + # validate the parameters if variable_substitution and not isinstance(variable_substitution, dict): raise ValueError( diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8dc7843 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +[bdist_wheel] + +universal=1 diff --git a/setup.py b/setup.py index efcc9ec..a332d80 100644 --- a/setup.py +++ b/setup.py @@ -3,32 +3,31 @@ https://packaging.python.org/en/latest/distributing.html https://github.com/pypa/sampleproject """ +#!/usr/bin/env python # Always prefer setuptools over distutils -from setuptools import setup, find_packages +from setuptools import setup # To use a consistent encoding -from codecs import open -from os import path -here = path.abspath(path.dirname(__file__)) +#here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.rst'), encoding='utf-8') as f: - long_description = f.read() +# with open(path.join(here, 'README.rst'), encoding='utf-8') as f: +# long_description = f.read() setup( - name='hive', + name='hive-executor-py', # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.0.0', + version='1.0.1.dev1', description='A hive client python project', - long_description=long_description, + #long_description=long_description, # The project's main homepage. - url='https://github.com/pypa/sampleproject', + url='https://github.com/calvinjiang/hive-executor-py', # Author details author='Calvin Jiang', @@ -43,7 +42,7 @@ # 3 - Alpha # 4 - Beta # 5 - Production/Stable - 'Development Status :: 3 - Alpha', + 'Development Status :: 4 - Beta', # Indicate who your project is intended for 'Intended Audience :: Developers', @@ -54,7 +53,7 @@ # Specify the Python versions you support here. In particular, ensure # that you indicate whether you support Python 2, Python 3 or both. - 'Programming Language :: Python :: 2', + #'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', @@ -64,11 +63,11 @@ ], # What does your project relate to? - keywords='sample setuptools development', + keywords='hive client python', # You can just specify the packages manually here if your project is # simple. Or you can use find_packages(). - packages=find_packages(exclude=['contrib', 'docs', 'tests']), + packages=['hive'], # Alternatively, if you want to distribute just a my_module.py, uncomment # this: @@ -78,36 +77,36 @@ # your project is installed. For an analysis of "install_requires" vs pip's # requirements files see: # https://packaging.python.org/en/latest/requirements.html - install_requires=['peppercorn'], + #install_requires=['peppercorn'], # List additional groups of dependencies here (e.g. development # dependencies). You can install these using the following syntax, # for example: # $ pip install -e .[dev,test] - extras_require={ - 'dev': ['check-manifest'], - 'test': ['coverage'], - }, + #extras_require={ + # 'dev': ['check-manifest'], + # 'test': ['coverage'], + #}, # If there are data files included in your packages that need to be # installed, specify them here. If using Python 2.6 or less, then these # have to be included in MANIFEST.in as well. - package_data={ - 'sample': ['package_data.dat'], - }, + #package_data={ + # 'sample': ['package_data.dat'], + #}, # Although 'package_data' is the preferred approach, in some case you may # need to place data files outside of your packages. See: # http://docs.python.org/3.4/distutils/setupscript.html#installing-additional-files # noqa # In this case, 'data_file' will be installed into '/my_data' - data_files=[('my_data', ['data/data_file'])], + #data_files=[('my_data', ['data/data_file'])], # To provide executable scripts, use entry points in preference to the # "scripts" keyword. Entry points provide cross-platform support and allow # pip to create the appropriate form of executable for the target platform. - entry_points={ - 'console_scripts': [ - 'sample=sample:main', - ], - }, -) \ No newline at end of file + #entry_points={ + # 'console_scripts': [ + # 'sample=sample:main', + # ], + #}, +) diff --git a/tests/test_executor.py b/tests/test_executor.py index e3f6a08..84ef30b 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -72,6 +72,10 @@ def test_show_tables(self): self.assertRaises(HiveCommandExecuteError, self.executor.show_tables, "test_db_name") + def test_show_databases(self): + if self.hive_enable: + self.assertEqual([],HiveCommandExecuteError,self.executor.show_databases("notexists_db_name")) + def tearDown(self): self.executor = None From eea7fd5fc911d22f30084bfe606df734289d17af Mon Sep 17 00:00:00 2001 From: calvin Date: Fri, 27 May 2016 10:51:01 +0800 Subject: [PATCH 2/6] modify setup.py --- hive/__init__.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hive/__init__.py b/hive/__init__.py index bd5ec0b..91ac823 100644 --- a/hive/__init__.py +++ b/hive/__init__.py @@ -1,5 +1,5 @@ __author__ = 'Hua Jiang' -__versioninfo__ = (1, 0, 0) +__versioninfo__ = (1, 0, 2) __version__ = '.'.join(map(str, __versioninfo__)) __title__ = 'hive-executor-py' diff --git a/setup.py b/setup.py index a332d80..9e5b6f5 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.0.1.dev1', + version='1.0.2.dev1', description='A hive client python project', #long_description=long_description, From 79db155ec5c670c1573c16b0bcb9b0c4688f5bf6 Mon Sep 17 00:00:00 2001 From: calvin Date: Mon, 6 Jun 2016 13:08:15 +0800 Subject: [PATCH 3/6] add methods in hive/executory.py --- hive/executor.py | 188 ++++++++++++++++++++++++++++++++--------- tests/test_data.py | 23 +++++ tests/test_executor.py | 28 +++++- 3 files changed, 199 insertions(+), 40 deletions(-) diff --git a/hive/executor.py b/hive/executor.py index 083f638..5fda55b 100644 --- a/hive/executor.py +++ b/hive/executor.py @@ -119,33 +119,6 @@ def last_partitions(self, db_name, table_name): else: return [] - def has_table(self, db_name, table_name): - tables = self.show_tables(db_name) - for table in tables: - if table == table_name: - return True - - return False - - def show_tables(self, db_name, like_parttern=None): - "substitute for `show tables`" - if like_parttern: - like_parttern = "like '%s'" % (like_parttern) - else: - like_parttern = "" - - hive_sql = "use %s;show tables %s;" % (db_name, like_parttern) - - _logger.debug("executed hive sql:%s" % (hive_sql)) - cr = self.execute(sql=hive_sql) - - if cr.status == 0: - return self._parse_lines_to_sequence(cr.stdout_text) - else: - _logger.error(cr.stderr_text) - raise HiveCommandExecuteError( - "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) - def show_partitions(self, db_name, table_name, search_partitions=None): "substitute for `show partitions`" explicit_partition = "" @@ -178,6 +151,89 @@ def show_partitions(self, db_name, table_name, search_partitions=None): return None + def add_partitions(self,db_name,table_name,partitions): + "substitute for `alter table db_name.table_name add if not exists partition(dt='',hour='');`" + hive_sql = "alter table %s.%s add if not exists \n" % (db_name,table_name) + + built_partitions=self._build_partitions(partitions) + + if len(built_partitions)>0: + hive_sql=hive_sql+"\n".join(built_partitions)+";" + + _logger.debug("executed hive sql:%s" % (hive_sql)) + cr = self.execute(sql=hive_sql) + + if cr.status == 0: + return True + else: + _logger.error(cr.stderr_text) + raise HiveCommandExecuteError( + "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) + return False + + def drop_partitions(self,db_name,table_name,partitions): + """ + substitute for `alter table db_name.table_name drop if exists + partition(dt='',hour='');` + """ + hive_sql = "alter table %s.%s drop if exists \n" % (db_name,table_name) + + built_partitions=self._build_partitions(partitions) + + if len(built_partitions)>0: + hive_sql=hive_sql+"\n".join(built_partitions)+";" + + _logger.debug("executed hive sql:%s" % (hive_sql)) + cr = self.execute(sql=hive_sql) + + if cr.status == 0: + return True + else: + _logger.error(cr.stderr_text) + raise HiveCommandExecuteError( + "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) + return False + + def _build_partitions(self,partitions): + built_partitions=[] + for partition in partitions: + values=[] + for key,value in partition.items(): + if isinstance(value,int): + values.append("%s=%s" %(key,value)) + else: + values.append("%s='%s'" %(key,value)) + built_partitions.append("partition(%s)" %(",".join(values))) + return built_partitions + + def has_table(self, db_name, table_name): + tables = self.show_tables(db_name) + for table in tables: + if table == table_name: + return True + + return False + + def show_tables(self, db_name, like_parttern=None): + "substitute for `show tables`" + if like_parttern: + like_parttern = "like '%s'" % (like_parttern) + else: + like_parttern = "" + + hive_sql = "use %s;show tables %s;" % (db_name, like_parttern) + + _logger.debug("executed hive sql:%s" % (hive_sql)) + cr = self.execute(sql=hive_sql) + + if cr.status == 0: + return self._parse_lines_to_sequence(cr.stdout_text) + else: + _logger.error(cr.stderr_text) + raise HiveCommandExecuteError( + "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) + + def show_functions(self): "substitute for `show functions`" hive_sql = "show functions;" @@ -282,7 +338,7 @@ def show_databases(self, like_parttern=None): raise HiveCommandExecuteError( "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) - def desc_database(self, db_name, extended=False): + def desc_database(self, db_name): "substitute for `desc database`" hive_sql = "set hive.cli.print.header=true;desc database %s;" % ( db_name) @@ -298,14 +354,23 @@ def desc_database(self, db_name, extended=False): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) def _parse_database(self, text): - "unsupported" - return text + db_info = {} + if text: + lines = text.strip().split("\n") + if len(lines) == 2: + header_names = re.split("\s+", lines[0]) + values = re.split("\s+", lines[1]) + for idx, value in enumerate(header_names): + if len(values) > idx: + if value=="comment" and values[idx].startswith("hdfs://"): + values.insert(idx,"") + db_info[value] = values[idx] + else: + db_info[value] = "" - def desc_function(self, db_name, table_name, extended=False): - "unsupported" - pass + return db_info - def desc_formatted_table(self, db_name, table_name): + def desc_function(self, db_name, table_name, extended=False): "unsupported" pass @@ -324,12 +389,56 @@ def show_roles(self): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) def drop_table(self, db_name, table_name): - "unsupported" - pass + "substitute for `drop table db_name.table_name`" + hive_sql = "drop table if exists %s.%s;" %(db_name,table_name) + + _logger.debug("executed hive sql:%s" % (hive_sql)) + cr = self.execute(sql=hive_sql) + + if cr.status == 0: + return True + else: + _logger.error(cr.stderr_text) + raise HiveCommandExecuteError( + "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) + return False + + def load_data(self,inpath,db,table_name,partitions=None,local=True,overwrite=True): + """ + substitute for `load data [local] inpath '' + [overwrite] into TABLE table_name + partition (dt='20160501',hour='12')` + """ + hive_sql="" + if local: + hive_sql = "load data local inpath '%s.%s'" %(inpath) + else: + hive_sql = "load data inpath '%s.%s'" %(inpath) + + if overwrite: + hive_sql="%s overwrite into table %s.%s" %(hive_sql,db,table_name) + else: + hive_sql="%s into table %s.%s" %(hive_sql,db,table_name) + + if partitions: + partition_seq=[] + for key,value in partitions.items(): + partition_seq.append("%s='%s'" %(key,value)) + hive_sql="%s partition (%s);" %(hive_sql,",".join(partition_seq)) + else: + hive_sql="%s;" %(hive_sql) + + _logger.debug("executed hive sql:%s" % (hive_sql)) + cr = self.execute(sql=hive_sql) + + if cr.status == 0: + return True + else: + _logger.error(cr.stderr_text) + raise HiveCommandExecuteError( + "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) + return False - def create_table(self, create_table_sql): - "unsupported" - pass def _parse_partitions(self, text): partitions = [] @@ -364,6 +473,7 @@ def _execute_system_command(self, command): status = process.poll() return CommandResult(output, err, status) + def execute(self, variable_substitution=None, init_sql_file=None, sql_file=None, sql=None, output_file=None): """this method can be used to execute hive cliet commands. diff --git a/tests/test_data.py b/tests/test_data.py index bcd6b65..cf66330 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,4 +1,5 @@ # coding=UTF-8 +from collections import OrderedDict HIVE_PARTITIONS_TEST_DATA="""dt=20160508/hour=00/type=click dt=20160508/hour=00/type=deliver @@ -248,3 +249,25 @@ HIVE_DESC_DATABASE_TEST_RESULT={"db_name":"adm","comment":"","location":"hdfs://AutoLqCluster/group_sjpt/hive_db/adm.db","owner_name":"root","owner_type":"USER","parameters":""} +HIVE_BUILD_PARTITIONS_TEST_DATA=[] +partition=OrderedDict() +partition['dt']='20160501' +partition['hour']='00' + +HIVE_BUILD_PARTITIONS_TEST_DATA.append(partition) + +partition=OrderedDict() +partition['dt']=20160502 +partition['hour']=1 +HIVE_BUILD_PARTITIONS_TEST_DATA.append(partition) + +HIVE_BUILD_PARTITIONS_TEST_RESULT=["partition(dt='20160501',hour='00')","partition(dt=20160502,hour=1)"] + +HIVE_CREATE_TABLE_TEST_SQL=""" +create table if not exists default.test_table_123 +( +name string comment 'name', +age int comment 'age' +) +partitioned by(dt string,hour string); +""" diff --git a/tests/test_executor.py b/tests/test_executor.py index 84ef30b..61cb819 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -64,6 +64,13 @@ def test_parse_table(self): self.assertEqual(result, test_data.HIVE_DESC_TABLE_TEST_RESULT, "the method HiveExecutor._parse_table() failed!") + def test_parse_database(self): + result = self.executor._parse_database( + test_data.HIVE_DESC_DATABASE_TEST_DATA) + + self.assertEqual(result, test_data.HIVE_DESC_DATABASE_TEST_RESULT, + "the method HiveExecutor._parse_database() failed!") + def test_show_partitions(self): self.assertRaises(HiveCommandExecuteError, self.executor.show_partitions, "test_db_name", "test_table_name") @@ -74,7 +81,26 @@ def test_show_tables(self): def test_show_databases(self): if self.hive_enable: - self.assertEqual([],HiveCommandExecuteError,self.executor.show_databases("notexists_db_name")) + self.assertEqual([], HiveCommandExecuteError, + self.executor.show_databases("notexists_db_name")) + + def test_drop_table(self): + if self.hive_enable: + self.assertEqual(True, self.executor.drop_table( + "test_db_name", "test_table_name")) + else: + self.assertRaises( + HiveCommandExecuteError, self.executor.drop_table, "test_db_name", "test_table_name") + + def test_build_partitions(self): + result = self.executor._build_partitions( + test_data.HIVE_BUILD_PARTITIONS_TEST_DATA) + + self.assertEqual(result, test_data.HIVE_BUILD_PARTITIONS_TEST_RESULT, + "the method HiveExecutor._build_partitions() failed!") + + self.assertEqual([], self.executor._build_partitions([]), + "the method HiveExecutor._build_partitions() failed!") def tearDown(self): self.executor = None From e05396053f5830565f5ba76159655b45c4825d95 Mon Sep 17 00:00:00 2001 From: calvin Date: Thu, 16 Jun 2016 13:34:47 +0800 Subject: [PATCH 4/6] add a required module of the common-utils. --- hive/executor.py | 45 +++++++++++------------------------------- setup.py | 4 ++-- tests/test_executor.py | 6 ++++-- 3 files changed, 17 insertions(+), 38 deletions(-) diff --git a/hive/executor.py b/hive/executor.py index 5fda55b..3459d38 100644 --- a/hive/executor.py +++ b/hive/executor.py @@ -10,37 +10,21 @@ ###################################### import os +import sys import subprocess import re import logging from collections import OrderedDict +from utils.calendar import Calendar +from utils.cmd import CommandExecutor +from utils.cmd import CommandResult + from hive.exceptions import (HiveUnfoundError, HiveCommandExecuteError) -_logger = logging.getLogger(__name__) - -class CommandResult(object): - """The class will be used to stored result of some methods in HivExecutor. - - Up to now the class only be used in the method of _execute_system_command - in the class HiveExecutor. - - Attributes - ---------- - stdout_text : str - The result of standart out. - stderr_text : str - The result of standart error. - status : int - The status of system command returns. - - """ +_logger = logging.getLogger(__name__) - def __init__(self, stdout_text, stderr_text, status): - self.stdout_text = stdout_text - self.stderr_text = stderr_text - self.status = status class HiveExecutor(object): @@ -94,7 +78,7 @@ def __init__(self, hive_cmd_path="hive", verbose=False): "When you passed the argument of hive_cmd_path,it should have a value.") cmd = "which %s" % (hive_cmd_path) - result = self._execute_system_command(cmd) + result = CommandExecutor.system(cmd) if result.status != 0: raise HiveUnfoundError( @@ -411,9 +395,9 @@ def load_data(self,inpath,db,table_name,partitions=None,local=True,overwrite=Tru """ hive_sql="" if local: - hive_sql = "load data local inpath '%s.%s'" %(inpath) + hive_sql = "load data local inpath '%s'" %(inpath) else: - hive_sql = "load data inpath '%s.%s'" %(inpath) + hive_sql = "load data inpath '%s'" %(inpath) if overwrite: hive_sql="%s overwrite into table %s.%s" %(hive_sql,db,table_name) @@ -465,14 +449,7 @@ def _parse_lines_to_sequence(self, text): return tables - def _execute_system_command(self, command): - status = 0 - process = subprocess.Popen( - command, shell=True, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, err = process.communicate() - status = process.poll() - return CommandResult(output, err, status) - + def execute(self, variable_substitution=None, init_sql_file=None, sql_file=None, sql=None, output_file=None): """this method can be used to execute hive cliet commands. @@ -549,6 +526,6 @@ def execute(self, variable_substitution=None, init_sql_file=None, sql_file=None, _logger.info("the function execute:%s" % (execute_cmd)) - cr = self._execute_system_command(execute_cmd) + cr = CommandExecutor.system(execute_cmd) return cr diff --git a/setup.py b/setup.py index 9e5b6f5..26bcaad 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.0.2.dev1', + version='1.0.3.dev1', description='A hive client python project', #long_description=long_description, @@ -77,7 +77,7 @@ # your project is installed. For an analysis of "install_requires" vs pip's # requirements files see: # https://packaging.python.org/en/latest/requirements.html - #install_requires=['peppercorn'], + install_requires=['common-utils'], # List additional groups of dependencies here (e.g. development # dependencies). You can install these using the following syntax, diff --git a/tests/test_executor.py b/tests/test_executor.py index 61cb819..5d18eed 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -5,9 +5,11 @@ import os import logging import logging.config +from utils.cmd import CommandResult +from utils.cmd import CommandExecutor + sys.path.append("..") from hive.executor import HiveExecutor -from hive.executor import CommandResult from hive.exceptions import HiveUnfoundError from hive.exceptions import HiveCommandExecuteError import test_data @@ -38,7 +40,7 @@ def setUp(self): self.executor = HiveExecutor("hive") def test_execute_system_command(self): - rc = self.executor._execute_system_command("ls ") + rc = CommandExecutor.system("ls ") self.assertTrue(isinstance(rc, CommandResult), "return value type error,not a CommandResult instance.") From f6a4a1f79b71ae6622b96e0409676870aec95983 Mon Sep 17 00:00:00 2001 From: calvin Date: Tue, 5 Jul 2016 15:10:08 +0800 Subject: [PATCH 5/6] add init_settings parameter in HiveExecutor's constructor. --- hive/__init__.py | 2 +- hive/executor.py | 83 +++++++++++++++++++++++++----------------------- 2 files changed, 44 insertions(+), 41 deletions(-) diff --git a/hive/__init__.py b/hive/__init__.py index 91ac823..cc60eb7 100644 --- a/hive/__init__.py +++ b/hive/__init__.py @@ -1,5 +1,5 @@ __author__ = 'Hua Jiang' -__versioninfo__ = (1, 0, 2) +__versioninfo__ = (1, 0, 3) __version__ = '.'.join(map(str, __versioninfo__)) __title__ = 'hive-executor-py' diff --git a/hive/executor.py b/hive/executor.py index 3459d38..8c2efbb 100644 --- a/hive/executor.py +++ b/hive/executor.py @@ -26,7 +26,6 @@ _logger = logging.getLogger(__name__) - class HiveExecutor(object): """HiveExecutor may be looked as a wrapper of HiveCLI. @@ -72,7 +71,7 @@ class HiveExecutor(object): """ - def __init__(self, hive_cmd_path="hive", verbose=False): + def __init__(self, hive_cmd_path="hive", hive_init_settings=[], verbose=False): if hive_cmd_path is None or len(hive_cmd_path) == 0: raise ValueError( "When you passed the argument of hive_cmd_path,it should have a value.") @@ -86,6 +85,7 @@ def __init__(self, hive_cmd_path="hive", verbose=False): self.hive_cmd_path = hive_cmd_path self.enable_verbose_mode = verbose + self.hive_init_settings = hive_init_settings self.__default_hive_command = self.hive_cmd_path + " -S " def has_partitions(self, db_name, table_name, check_partitions): @@ -135,14 +135,15 @@ def show_partitions(self, db_name, table_name, search_partitions=None): return None - def add_partitions(self,db_name,table_name,partitions): + def add_partitions(self, db_name, table_name, partitions): "substitute for `alter table db_name.table_name add if not exists partition(dt='',hour='');`" - hive_sql = "alter table %s.%s add if not exists \n" % (db_name,table_name) + hive_sql = "alter table %s.%s add if not exists \n" % ( + db_name, table_name) - built_partitions=self._build_partitions(partitions) + built_partitions = self._build_partitions(partitions) - if len(built_partitions)>0: - hive_sql=hive_sql+"\n".join(built_partitions)+";" + if len(built_partitions) > 0: + hive_sql = hive_sql + "\n".join(built_partitions) + ";" _logger.debug("executed hive sql:%s" % (hive_sql)) cr = self.execute(sql=hive_sql) @@ -155,17 +156,18 @@ def add_partitions(self,db_name,table_name,partitions): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) return False - def drop_partitions(self,db_name,table_name,partitions): + def drop_partitions(self, db_name, table_name, partitions): """ substitute for `alter table db_name.table_name drop if exists partition(dt='',hour='');` """ - hive_sql = "alter table %s.%s drop if exists \n" % (db_name,table_name) + hive_sql = "alter table %s.%s drop if exists \n" % ( + db_name, table_name) + + built_partitions = self._build_partitions(partitions) - built_partitions=self._build_partitions(partitions) - - if len(built_partitions)>0: - hive_sql=hive_sql+"\n".join(built_partitions)+";" + if len(built_partitions) > 0: + hive_sql = hive_sql + "\n".join(built_partitions) + ";" _logger.debug("executed hive sql:%s" % (hive_sql)) cr = self.execute(sql=hive_sql) @@ -178,16 +180,16 @@ def drop_partitions(self,db_name,table_name,partitions): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) return False - def _build_partitions(self,partitions): - built_partitions=[] + def _build_partitions(self, partitions): + built_partitions = [] for partition in partitions: - values=[] - for key,value in partition.items(): - if isinstance(value,int): - values.append("%s=%s" %(key,value)) + values = [] + for key, value in partition.items(): + if isinstance(value, int): + values.append("%s=%s" % (key, value)) else: - values.append("%s='%s'" %(key,value)) - built_partitions.append("partition(%s)" %(",".join(values))) + values.append("%s='%s'" % (key, value)) + built_partitions.append("partition(%s)" % (",".join(values))) return built_partitions def has_table(self, db_name, table_name): @@ -217,7 +219,6 @@ def show_tables(self, db_name, like_parttern=None): raise HiveCommandExecuteError( "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) - def show_functions(self): "substitute for `show functions`" hive_sql = "show functions;" @@ -346,8 +347,8 @@ def _parse_database(self, text): values = re.split("\s+", lines[1]) for idx, value in enumerate(header_names): if len(values) > idx: - if value=="comment" and values[idx].startswith("hdfs://"): - values.insert(idx,"") + if value == "comment" and values[idx].startswith("hdfs://"): + values.insert(idx, "") db_info[value] = values[idx] else: db_info[value] = "" @@ -374,7 +375,7 @@ def show_roles(self): def drop_table(self, db_name, table_name): "substitute for `drop table db_name.table_name`" - hive_sql = "drop table if exists %s.%s;" %(db_name,table_name) + hive_sql = "drop table if exists %s.%s;" % (db_name, table_name) _logger.debug("executed hive sql:%s" % (hive_sql)) cr = self.execute(sql=hive_sql) @@ -387,30 +388,32 @@ def drop_table(self, db_name, table_name): "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) return False - def load_data(self,inpath,db,table_name,partitions=None,local=True,overwrite=True): + def load_data(self, inpath, db, table_name, partitions=None, local=True, overwrite=True): """ substitute for `load data [local] inpath '' [overwrite] into TABLE table_name partition (dt='20160501',hour='12')` """ - hive_sql="" + hive_sql = "" if local: - hive_sql = "load data local inpath '%s'" %(inpath) + hive_sql = "load data local inpath '%s'" % (inpath) else: - hive_sql = "load data inpath '%s'" %(inpath) + hive_sql = "load data inpath '%s'" % (inpath) if overwrite: - hive_sql="%s overwrite into table %s.%s" %(hive_sql,db,table_name) + hive_sql = "%s overwrite into table %s.%s" % ( + hive_sql, db, table_name) else: - hive_sql="%s into table %s.%s" %(hive_sql,db,table_name) + hive_sql = "%s into table %s.%s" % (hive_sql, db, table_name) if partitions: - partition_seq=[] - for key,value in partitions.items(): - partition_seq.append("%s='%s'" %(key,value)) - hive_sql="%s partition (%s);" %(hive_sql,",".join(partition_seq)) + partition_seq = [] + for key, value in partitions.items(): + partition_seq.append("%s='%s'" % (key, value)) + hive_sql = "%s partition (%s);" % ( + hive_sql, ",".join(partition_seq)) else: - hive_sql="%s;" %(hive_sql) + hive_sql = "%s;" % (hive_sql) _logger.debug("executed hive sql:%s" % (hive_sql)) cr = self.execute(sql=hive_sql) @@ -423,7 +426,6 @@ def load_data(self,inpath,db,table_name,partitions=None,local=True,overwrite=Tru "the hive command:%s error! error info:%s" % (hive_sql, cr.stderr_text)) return False - def _parse_partitions(self, text): partitions = [] if text: @@ -449,8 +451,6 @@ def _parse_lines_to_sequence(self, text): return tables - - def execute(self, variable_substitution=None, init_sql_file=None, sql_file=None, sql=None, output_file=None): """this method can be used to execute hive cliet commands. @@ -508,7 +508,10 @@ def execute(self, variable_substitution=None, init_sql_file=None, sql_file=None, hive_sql_file = "" hive_sql = "" if sql: - hive_sql = " -e \"%s\"" % (sql) + if len(self.hive_init_settings)>0: + hive_sql = " -e \"%s;%s\"" % (";".join(self.hive_init_settings),sql) + else: + hive_sql = " -e \"%s\"" % (sql) else: if sql_file: hive_sql_file = " -f %s" % (sql_file) From a08e9d2f2f8d64441df8c4793823b601be3865a7 Mon Sep 17 00:00:00 2001 From: calvin Date: Thu, 11 Aug 2016 14:16:08 +0800 Subject: [PATCH 6/6] modify executor.py --- hive/__init__.py | 2 +- hive/executor.py | 8 ++++++++ setup.py | 2 +- tests/test_executor.py | 5 ++--- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/hive/__init__.py b/hive/__init__.py index cc60eb7..6d4e5fd 100644 --- a/hive/__init__.py +++ b/hive/__init__.py @@ -1,5 +1,5 @@ __author__ = 'Hua Jiang' -__versioninfo__ = (1, 0, 3) +__versioninfo__ = (1, 0, 4) __version__ = '.'.join(map(str, __versioninfo__)) __title__ = 'hive-executor-py' diff --git a/hive/executor.py b/hive/executor.py index 8c2efbb..20ff240 100644 --- a/hive/executor.py +++ b/hive/executor.py @@ -45,6 +45,9 @@ class HiveExecutor(object): hive_cmd_path : Optional[str] The path of hive client. Default is 'hive'. + hive_init_settings : Optional[sequence] + The settings of hive client. + Default is [].An empty sequence. verbose : Optional[bool] Default is False. @@ -68,6 +71,11 @@ class HiveExecutor(object): >>> tables=client.show_tables('default') ['table1', 'table2'] + + >>> init_settings=[] + >>> init_settings.append("set mapred.job.queue.name=your_queue_name") + >>> init_settings.append("set hive.exec.dynamic.partition.mode=nonstrict") + >>> client=HiveExecutor(hive_cmd_path="hive",hive_init_settings=init_settings) """ diff --git a/setup.py b/setup.py index 26bcaad..26ab0fe 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.0.3.dev1', + version='1.0.4.dev1', description='A hive client python project', #long_description=long_description, diff --git a/tests/test_executor.py b/tests/test_executor.py index 5d18eed..6827900 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -15,7 +15,7 @@ import test_data #CONF_LOG = "../conf/logging.conf" -# logging.config.fileConfig(CONF_LOG) +#logging.config.fileConfig(CONF_LOG) logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', @@ -83,8 +83,7 @@ def test_show_tables(self): def test_show_databases(self): if self.hive_enable: - self.assertEqual([], HiveCommandExecuteError, - self.executor.show_databases("notexists_db_name")) + self.assertEqual([], self.executor.show_databases("notexists_db_name")) def test_drop_table(self): if self.hive_enable: