Files
ONE-OS/scripts/fill_h2_import_template.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

238 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""将加氢订单 Excel 数据填入导入模板(列名可自动匹配多种写法)。"""
from __future__ import annotations
import argparse
import re
from datetime import datetime
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
# 模板列 -> 源表中可能出现的列名(按优先级)
SOURCE_CANDIDATES: dict[str, list[str]] = {
"加氢站名称": ["加氢站", "加氢站名称", "站点", "站点名称"],
"订单编号": ["序号", "订单编号", "订单号"],
"加氢时间": ["日期", "加氢时间", "加氢日期", "时间"],
"加氢量": [
"加气量kg",
"加气量kg)", # 半角右括号,常见于台账导出
"加气量(kg)",
"加氢量kg",
"加氢量(kg)",
"加氢量",
"加气量",
],
"车牌": ["车牌", "车牌号"],
"行驶里程数": ["行驶里程(km)", "行驶里程km", "行驶里程", "里程", "公里数"],
}
def _pick_column(df: pd.DataFrame, template_col: str) -> str | None:
for name in SOURCE_CANDIDATES.get(template_col, [template_col]):
if name in df.columns:
return name
return None
def _is_unknown_station(val) -> bool:
if val is None or (isinstance(val, float) and pd.isna(val)):
return True
x = str(val).strip()
if not x or x.lower() in ("nan", "none"):
return True
return x in ("未知加氢站", "未知")
def _repair_unknown_stations(
df: pd.DataFrame,
station_col: str,
order_col: str,
plate_col: str,
time_col: str,
qty_col: str,
) -> pd.DataFrame:
"""源表中同一笔加氢(订单+车牌+时间+加氢量相同)若混有「未知加氢站」与其它站名,用非未知站名回填。
常见于 Excel 导入产生的重复行:一行站名正确、一行为未知。
"""
work = df.copy()
work["_g_t"] = pd.to_datetime(work[time_col], errors="coerce").dt.strftime("%Y-%m-%d %H:%M:%S")
work["_g_v"] = pd.to_numeric(work[qty_col], errors="coerce").round(4)
gcols = [order_col, plate_col, "_g_t", "_g_v"]
def consensus_fill(s: pd.Series) -> pd.Series:
good_mask = ~s.map(_is_unknown_station)
good = s[good_mask]
if good.empty:
return s
uniq = pd.unique(good.astype(str).str.strip())
if len(uniq) != 1:
return s
fill = good.iloc[0]
out = s.copy()
out[~good_mask] = fill
return out
work[station_col] = work.groupby(gcols, dropna=False)[station_col].transform(consensus_fill)
return work.drop(columns=["_g_t", "_g_v"])
def normalize_datetime_cell(val) -> str:
"""统一为 YYYY-MM-DD HH:MM:SS无法解析则返回原值的字符串形式。"""
if val is None or (isinstance(val, float) and pd.isna(val)):
return ""
if isinstance(val, pd.Timestamp):
if pd.isna(val):
return ""
dt = val.to_pydatetime()
return dt.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(val, datetime):
return val.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(val, (int, float)) and not isinstance(val, bool):
# Excel 序列日期
try:
ts = pd.to_datetime(val, unit="D", origin="1899-12-30", errors="coerce")
if pd.isna(ts):
return str(val)
return ts.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
s = str(val).strip()
if not s or s.lower() in ("nan", "none", "-"):
return s if s == "-" else ""
ts = pd.to_datetime(s, errors="coerce")
if pd.isna(ts):
# 常见中文/斜杠格式再试
for pat in (r"(\d{4})[年/-](\d{1,2})[月/-](\d{1,2})",):
m = re.search(pat, s)
if m:
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
rest = s[m.end() :].strip()
h = mi = se = 0
m2 = re.search(r"(\d{1,2})\s*[::时]\s*(\d{1,2})(?:\s*[::分]\s*(\d{1,2}))?", rest)
if m2:
h, mi = int(m2.group(1)), int(m2.group(2))
se = int(m2.group(3) or 0)
try:
return datetime(y, mo, d, h, mi, se).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
pass
return s
return ts.strftime("%Y-%m-%d %H:%M:%S")
def fill_template(
template_path: Path,
source_path: Path,
out_path: Path,
source_sheet: str | int = 0,
repair_unknown_station: bool = True,
source_header: int = 0,
) -> None:
template_wb = load_workbook(template_path)
if "加氢订单" not in template_wb.sheetnames:
raise SystemExit("模板中未找到工作表「加氢订单」")
tpl_ws = template_wb["加氢订单"]
header = [c.value for c in next(tpl_ws.iter_rows(min_row=1, max_row=1))]
if not header or any(x is None for x in header):
raise SystemExit("模板首行表头异常")
df = pd.read_excel(source_path, sheet_name=source_sheet, header=source_header)
col_src: dict[str, str] = {}
missing = []
for tpl_col in header:
if tpl_col is None:
continue
tpl_col = str(tpl_col).strip()
src = _pick_column(df, tpl_col)
if src is None:
missing.append(tpl_col)
else:
col_src[tpl_col] = src
required = ["加氢站名称", "订单编号", "加氢时间", "加氢量", "车牌"]
for k in required:
if k not in col_src:
raise SystemExit(
f"源表缺少可映射列: 模板「{k}」需要以下之一 {SOURCE_CANDIDATES.get(k, [])}"
f"当前源表列: {list(df.columns)}"
)
if repair_unknown_station:
st = col_src["加氢站名称"]
df = _repair_unknown_stations(
df,
station_col=st,
order_col=col_src["订单编号"],
plate_col=col_src["车牌"],
time_col=col_src["加氢时间"],
qty_col=col_src["加氢量"],
)
# 清空模板数据行(保留表头)
if tpl_ws.max_row > 1:
tpl_ws.delete_rows(2, tpl_ws.max_row - 1)
time_col = col_src["加氢时间"]
for _, row in df.iterrows():
out_row: list = []
for tpl_col in header:
if tpl_col is None:
out_row.append(None)
continue
tpl_col = str(tpl_col).strip()
if tpl_col not in col_src:
out_row.append("")
continue
src_col = col_src[tpl_col]
v = row[src_col]
if tpl_col == "加氢时间":
out_row.append(normalize_datetime_cell(v))
elif pd.isna(v):
out_row.append("")
else:
out_row.append(v)
tpl_ws.append(out_row)
template_wb.save(out_path)
print(f"已写入 {out_path},共 {len(df)} 行数据。")
if missing:
print("(以下模板列在源表中未匹配到,已留空)", ", ".join(missing))
def main() -> None:
p = argparse.ArgumentParser(description="加氢订单 -> 导入模板")
p.add_argument("--template", type=Path, required=True)
p.add_argument("--source", type=Path, required=True)
p.add_argument("--out", type=Path, required=True)
p.add_argument("--sheet", default="加氢订单", help="源 Excel 工作表名,默认「加氢订单」")
p.add_argument(
"--source-header",
type=int,
default=0,
help="源表表头所在行pandas 的 header0 为第一行;台账「明细台账」一般为 1",
)
p.add_argument(
"--no-repair-unknown-station",
action="store_true",
help="关闭「同笔交易未知加氢站」回填(默认开启,用于修正源表重复行)",
)
args = p.parse_args()
sheet = args.sheet
if str(sheet).isdigit():
sheet = int(sheet)
fill_template(
args.template,
args.source,
args.out,
source_sheet=sheet,
repair_unknown_station=not args.no_repair_unknown_station,
source_header=args.source_header,
)
if __name__ == "__main__":
main()