Files
oneos-backend/sql/energy/import_sql.py
kkfluous 2f38a703f9 refactor(energy): 简化事件驱动系统(7个→3个)
- 删除旧事件:BillApprovedEvent, BillCreatedEvent, DeductionCompletedEvent, DetailAuditedEvent, DetailCreatedEvent, RecordMatchedEvent
- 新增事件:BillAuditPassedEvent, DetailAuditPassedEvent
- 保留事件:RecordImportedEvent
- 更新监听器:AccountEventListener, BillEventListener, DetailEventListener
- 清理代码中的旧事件引用和注释

优化原则:前端简单,后端健壮
事件流程:导入→匹配→生成明细→审核→扣款→生成账单→结算
2026-03-16 12:53:14 +08:00

111 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
SQL 导入工具
"""
import pymysql
import sys
def execute_sql_file(host, port, user, password, sql_file):
"""执行 SQL 文件"""
try:
# 读取 SQL 文件
with open(sql_file, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 连接数据库(不指定数据库,让 SQL 中的 USE 语句生效)
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
autocommit=False
)
print(f"✓ 连接数据库成功: {host}:{port}")
# 分割 SQL 语句(按分号分割,但要处理存储过程等特殊情况)
statements = []
current_statement = []
in_delimiter = False
for line in sql_content.split('\n'):
line = line.strip()
# 跳过注释和空行
if not line or line.startswith('--') or line.startswith('#'):
continue
# 处理 DELIMITER
if line.upper().startswith('DELIMITER'):
in_delimiter = not in_delimiter
continue
current_statement.append(line)
# 判断语句结束
if not in_delimiter and line.endswith(';'):
statements.append(' '.join(current_statement))
current_statement = []
# 执行 SQL 语句
with connection.cursor() as cursor:
success_count = 0
error_count = 0
for i, statement in enumerate(statements, 1):
if not statement.strip():
continue
try:
cursor.execute(statement)
connection.commit()
success_count += 1
# 每 10 条打印一次进度
if success_count % 10 == 0:
print(f" 已执行 {success_count} 条语句...")
except Exception as e:
error_count += 1
print(f"✗ 语句 {i} 执行失败: {str(e)[:100]}")
# 继续执行下一条
print(f"\n✓ 执行完成: 成功 {success_count} 条, 失败 {error_count}")
connection.close()
return True
except Exception as e:
print(f"✗ 错误: {e}")
return False
if __name__ == '__main__':
# 数据库配置
DB_HOST = '47.103.115.36'
DB_PORT = 3306
DB_USER = 'root'
DB_PASSWORD = 'Passw0rd2026'
# SQL 文件列表
sql_files = [
'energy_init_all.sql',
'energy_mock_data.sql'
]
print("=" * 60)
print("OneOS Energy 模块数据导入")
print("=" * 60)
for sql_file in sql_files:
print(f"\n>>> 导入 {sql_file}")
success = execute_sql_file(DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, sql_file)
if not success:
print(f"{sql_file} 导入失败")
sys.exit(1)
print("\n" + "=" * 60)
print("✓ 所有数据导入完成")
print("=" * 60)