import json from pyhive import hive from thrift import Thrift class HiveConnection: def __init__(self, host, port=10000, username=None): self.host = host self.port = port self.username = username self.connection = None def connect(self): try: self.connection = hive.Connection( host=self.host, port=self.port, username=self.username, auth='NONE' # 根据需要选择其他身份验证方式 ) return self.connection except Thrift.TException as e: print(f"连接Hive失败: {str(e)}") return None def execute_query(self, query): if not self.connection: self.connect() try: cursor = self.connection.cursor() cursor.execute(query) results = cursor.fetchall() # 获取列名 column_names = [desc[0] for desc in cursor.description] # 将结果转换为字典格式 data = [dict(zip(column_names, row)) for row in results] return json.dumps(data) # 转换为 JSON 字符串 return results except Exception as e: print(f"执行查询失败: {str(e)}") return None finally: cursor.close() def close(self): if self.connection: self.connection.close() # 示例用法 if __name__ == '__main__': hive_conn = HiveConnection(host='192.168.110.130', port=10000, username='root') json_results = hive_conn.execute_query("SELECT * FROM bs_python_paper_analysis.tb_item LIMIT 10") print(json_results ) hive_conn.close()