#!/usr/bin/env python3 """ SQL 导入工具 """ import pymysql import sys def execute_sql_file(host, port, user, password, sql_file): """执行 SQL 文件""" try: # 读取 SQL 文件 with open(sql_file, 'r', encoding='utf-8') as f: sql_content = f.read() # 连接数据库(不指定数据库,让 SQL 中的 USE 语句生效) connection = pymysql.connect( host=host, port=port, user=user, password=password, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, autocommit=False ) print(f"✓ 连接数据库成功: {host}:{port}") # 分割 SQL 语句(按分号分割,但要处理存储过程等特殊情况) statements = [] current_statement = [] in_delimiter = False for line in sql_content.split('\n'): line = line.strip() # 跳过注释和空行 if not line or line.startswith('--') or line.startswith('#'): continue # 处理 DELIMITER if line.upper().startswith('DELIMITER'): in_delimiter = not in_delimiter continue current_statement.append(line) # 判断语句结束 if not in_delimiter and line.endswith(';'): statements.append(' '.join(current_statement)) current_statement = [] # 执行 SQL 语句 with connection.cursor() as cursor: success_count = 0 error_count = 0 for i, statement in enumerate(statements, 1): if not statement.strip(): continue try: cursor.execute(statement) connection.commit() success_count += 1 # 每 10 条打印一次进度 if success_count % 10 == 0: print(f" 已执行 {success_count} 条语句...") except Exception as e: error_count += 1 print(f"✗ 语句 {i} 执行失败: {str(e)[:100]}") # 继续执行下一条 print(f"\n✓ 执行完成: 成功 {success_count} 条, 失败 {error_count} 条") connection.close() return True except Exception as e: print(f"✗ 错误: {e}") return False if __name__ == '__main__': # 数据库配置 DB_HOST = '47.103.115.36' DB_PORT = 3306 DB_USER = 'root' DB_PASSWORD = 'Passw0rd2026' # SQL 文件列表 sql_files = [ 'energy_init_all.sql', 'energy_mock_data.sql' ] print("=" * 60) print("OneOS Energy 模块数据导入") print("=" * 60) for sql_file in sql_files: print(f"\n>>> 导入 {sql_file}") success = execute_sql_file(DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, sql_file) if not success: print(f"✗ {sql_file} 导入失败") sys.exit(1) print("\n" + "=" * 60) print("✓ 所有数据导入完成") print("=" * 60)