#!/usr/bin/env python3 """ 重建 Asset 和 Energy 模块数据表并插入 Mock 数据 - Asset 表 → oneos_asset 数据库 - Energy 表 → oneos_energy 数据库 - 使用 pymysql 连接远程 MySQL(规避本地 mysql 客户端 auth plugin 问题) """ import os import sys import re try: import pymysql except ImportError: print("pymysql 未安装,正在安装...") os.system(f"{sys.executable} -m pip install pymysql") import pymysql # 数据库连接配置 DB_CONFIG = { "host": "47.103.115.36", "port": 3306, "user": "root", "password": "Passw0rd2026", "charset": "utf8mb4", } SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_sql_file(filename): """读取 SQL 文件内容""" filepath = os.path.join(SCRIPT_DIR, filename) with open(filepath, "r", encoding="utf-8") as f: return f.read() def split_statements(sql_text): """ 将 SQL 文本拆分为独立语句。 处理 CREATE VIEW 等多行语句,跳过注释和空行。 """ statements = [] current = [] for line in sql_text.split("\n"): stripped = line.strip() # 跳过纯注释行和空行(但保留语句中间的注释) if not stripped or stripped.startswith("--"): if current: # 如果当前语句已经开始,保留注释行(可能是语句内的注释) pass continue current.append(line) if stripped.endswith(";"): stmt = "\n".join(current).strip() if stmt: statements.append(stmt) current = [] # 处理没有分号结尾的最后一条语句 if current: stmt = "\n".join(current).strip() if stmt: statements.append(stmt) return statements def execute_sql(cursor, sql_text, label=""): """执行一段 SQL(可包含多条语句)""" statements = split_statements(sql_text) success = 0 errors = 0 for i, stmt in enumerate(statements, 1): # 提取语句摘要用于日志 first_line = stmt.split("\n")[0][:80] try: cursor.execute(stmt) success += 1 except pymysql.err.OperationalError as e: code = e.args[0] # 1051: Unknown table (DROP 不存在的表) - 忽略 if code == 1051: success += 1 else: errors += 1 print(f" ❌ [{label}] 语句 {i} 失败: {e}") print(f" SQL: {first_line}...") except Exception as e: errors += 1 print(f" ❌ [{label}] 语句 {i} 失败: {e}") print(f" SQL: {first_line}...") return success, errors def main(): print("=" * 60) print("ONE-OS 数据表重建工具") print("=" * 60) # 读取 SQL 文件 print("\n📄 读取 SQL 文件...") asset_ddl = read_sql_file("asset_ddl.sql") energy_ddl = read_sql_file("energy_ddl.sql") mock_data = read_sql_file("mock_data.sql") # 拆分 mock_data 为 asset 和 energy 部分 parts = mock_data.split("-- ===================== PART 2: ENERGY =====================") if len(parts) != 2: print("❌ mock_data.sql 格式错误:找不到 PART 2 分隔符") sys.exit(1) asset_mock = parts[0] energy_mock = parts[1] # 从 asset_mock 中去掉文件头注释和 PART 1 标记 # 保留实际的 INSERT 语句 total_success = 0 total_errors = 0 # ========== 1. 处理 Asset 数据库 ========== print("\n" + "=" * 60) print("🔧 [1/3] 重建 oneos_asset 数据库表...") print("=" * 60) conn_asset = pymysql.connect(**DB_CONFIG, database="oneos_asset") conn_asset.autocommit(True) cursor_asset = conn_asset.cursor() # 创建 Asset 表 print("\n📋 执行 Asset DDL(29张表 + 1个视图)...") s, e = execute_sql(cursor_asset, asset_ddl, "Asset DDL") total_success += s total_errors += e print(f" ✅ Asset DDL: {s} 成功, {e} 失败") # 插入 Asset Mock 数据 print("\n📋 插入 Asset Mock 数据...") s, e = execute_sql(cursor_asset, asset_mock, "Asset Mock") total_success += s total_errors += e print(f" ✅ Asset Mock: {s} 成功, {e} 失败") cursor_asset.close() conn_asset.close() # ========== 2. 处理 Energy 数据库 ========== print("\n" + "=" * 60) print("🔧 [2/3] 重建 oneos_energy 数据库表...") print("=" * 60) # 先确保 oneos_energy 数据库存在 conn_init = pymysql.connect(**DB_CONFIG) conn_init.autocommit(True) cursor_init = conn_init.cursor() cursor_init.execute("CREATE DATABASE IF NOT EXISTS `oneos_energy` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci") cursor_init.close() conn_init.close() conn_energy = pymysql.connect(**DB_CONFIG, database="oneos_energy") conn_energy.autocommit(True) cursor_energy = conn_energy.cursor() # 创建 Energy 表 print("\n📋 执行 Energy DDL(9张表)...") s, e = execute_sql(cursor_energy, energy_ddl, "Energy DDL") total_success += s total_errors += e print(f" ✅ Energy DDL: {s} 成功, {e} 失败") # 插入 Energy Mock 数据 print("\n📋 插入 Energy Mock 数据...") s, e = execute_sql(cursor_energy, energy_mock, "Energy Mock") total_success += s total_errors += e print(f" ✅ Energy Mock: {s} 成功, {e} 失败") cursor_energy.close() conn_energy.close() # ========== 3. 验证 ========== print("\n" + "=" * 60) print("🔍 [3/3] 验证表结构和数据...") print("=" * 60) # 验证 Asset conn_v = pymysql.connect(**DB_CONFIG, database="oneos_asset") cursor_v = conn_v.cursor() cursor_v.execute("SELECT TABLE_NAME, TABLE_ROWS FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'oneos_asset' AND TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME") asset_tables = cursor_v.fetchall() print(f"\n📊 oneos_asset 数据库: {len(asset_tables)} 张表") for table_name, row_count in asset_tables: cursor_v.execute(f"SELECT COUNT(*) FROM `{table_name}`") actual_count = cursor_v.fetchone()[0] print(f" {table_name}: {actual_count} 行") cursor_v.close() conn_v.close() # 验证 Energy conn_v2 = pymysql.connect(**DB_CONFIG, database="oneos_energy") cursor_v2 = conn_v2.cursor() cursor_v2.execute("SELECT TABLE_NAME, TABLE_ROWS FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'oneos_energy' AND TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME") energy_tables = cursor_v2.fetchall() print(f"\n📊 oneos_energy 数据库: {len(energy_tables)} 张表") for table_name, row_count in energy_tables: cursor_v2.execute(f"SELECT COUNT(*) FROM `{table_name}`") actual_count = cursor_v2.fetchone()[0] print(f" {table_name}: {actual_count} 行") cursor_v2.close() conn_v2.close() # ========== 总结 ========== print("\n" + "=" * 60) print(f"🎉 完成! 总计: {total_success} 成功, {total_errors} 失败") print("=" * 60) if total_errors > 0: sys.exit(1) if __name__ == "__main__": main()