diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 65238dfae012c7..0b7b056b69d3a2 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -420,6 +420,11 @@ def load_file( pvals = ", ".join( ["{0}='{1}'".format(k, v) for k, v in partition.items()]) hql += "PARTITION ({pvals});" + + # As a workaround for HIVE-10541, add a newline character + # at the end of hql (AIRFLOW-2412). + hql += '\n' + hql = hql.format(**locals()) self.log.info(hql) self.run_cli(hql) diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py index f48bed84487f52..c5378314609711 100644 --- a/tests/hooks/test_hive_hook.py +++ b/tests/hooks/test_hive_hook.py @@ -20,12 +20,14 @@ import datetime import random + +import mock import unittest from hmsclient import HMSClient from airflow.exceptions import AirflowException -from airflow.hooks.hive_hooks import HiveMetastoreHook +from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook from airflow import DAG, configuration, operators from airflow.utils import timezone @@ -82,6 +84,28 @@ def tearDown(self): metastore.drop_table(self.database, self.table, deleteData=True) +class TestHiveCliHook(unittest.TestCase): + + def test_run_cli(self): + hook = HiveCliHook() + hook.run_cli("SHOW DATABASES") + + @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.run_cli') + def test_load_file(self, mock_run_cli): + filepath = "/path/to/input/file" + table = "output_table" + + hook = HiveCliHook() + hook.load_file(filepath=filepath, table=table, create=False) + + query = ( + "LOAD DATA LOCAL INPATH '{filepath}' " + "OVERWRITE INTO TABLE {table} \n" + .format(filepath=filepath, table=table) + ) + mock_run_cli.assert_called_with(query) + + class TestHiveMetastoreHook(HiveEnvironmentTest): VALID_FILTER_MAP = {'key2': 'value2'}