Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
238 lines
8.1 KiB
Python
238 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
||
"""将加氢订单 Excel 数据填入导入模板(列名可自动匹配多种写法)。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import re
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
|
||
# 模板列 -> 源表中可能出现的列名(按优先级)
|
||
SOURCE_CANDIDATES: dict[str, list[str]] = {
|
||
"加氢站名称": ["加氢站", "加氢站名称", "站点", "站点名称"],
|
||
"订单编号": ["序号", "订单编号", "订单号"],
|
||
"加氢时间": ["日期", "加氢时间", "加氢日期", "时间"],
|
||
"加氢量": [
|
||
"加气量(kg)",
|
||
"加气量(kg)", # 半角右括号,常见于台账导出
|
||
"加气量(kg)",
|
||
"加氢量(kg)",
|
||
"加氢量(kg)",
|
||
"加氢量",
|
||
"加气量",
|
||
],
|
||
"车牌": ["车牌", "车牌号"],
|
||
"行驶里程数": ["行驶里程(km)", "行驶里程(km)", "行驶里程", "里程", "公里数"],
|
||
}
|
||
|
||
|
||
def _pick_column(df: pd.DataFrame, template_col: str) -> str | None:
|
||
for name in SOURCE_CANDIDATES.get(template_col, [template_col]):
|
||
if name in df.columns:
|
||
return name
|
||
return None
|
||
|
||
|
||
def _is_unknown_station(val) -> bool:
|
||
if val is None or (isinstance(val, float) and pd.isna(val)):
|
||
return True
|
||
x = str(val).strip()
|
||
if not x or x.lower() in ("nan", "none"):
|
||
return True
|
||
return x in ("未知加氢站", "未知")
|
||
|
||
|
||
def _repair_unknown_stations(
|
||
df: pd.DataFrame,
|
||
station_col: str,
|
||
order_col: str,
|
||
plate_col: str,
|
||
time_col: str,
|
||
qty_col: str,
|
||
) -> pd.DataFrame:
|
||
"""源表中同一笔加氢(订单+车牌+时间+加氢量相同)若混有「未知加氢站」与其它站名,用非未知站名回填。
|
||
|
||
常见于 Excel 导入产生的重复行:一行站名正确、一行为未知。
|
||
"""
|
||
work = df.copy()
|
||
work["_g_t"] = pd.to_datetime(work[time_col], errors="coerce").dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
work["_g_v"] = pd.to_numeric(work[qty_col], errors="coerce").round(4)
|
||
gcols = [order_col, plate_col, "_g_t", "_g_v"]
|
||
|
||
def consensus_fill(s: pd.Series) -> pd.Series:
|
||
good_mask = ~s.map(_is_unknown_station)
|
||
good = s[good_mask]
|
||
if good.empty:
|
||
return s
|
||
uniq = pd.unique(good.astype(str).str.strip())
|
||
if len(uniq) != 1:
|
||
return s
|
||
fill = good.iloc[0]
|
||
out = s.copy()
|
||
out[~good_mask] = fill
|
||
return out
|
||
|
||
work[station_col] = work.groupby(gcols, dropna=False)[station_col].transform(consensus_fill)
|
||
return work.drop(columns=["_g_t", "_g_v"])
|
||
|
||
|
||
def normalize_datetime_cell(val) -> str:
|
||
"""统一为 YYYY-MM-DD HH:MM:SS;无法解析则返回原值的字符串形式。"""
|
||
if val is None or (isinstance(val, float) and pd.isna(val)):
|
||
return ""
|
||
if isinstance(val, pd.Timestamp):
|
||
if pd.isna(val):
|
||
return ""
|
||
dt = val.to_pydatetime()
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
if isinstance(val, datetime):
|
||
return val.strftime("%Y-%m-%d %H:%M:%S")
|
||
if isinstance(val, (int, float)) and not isinstance(val, bool):
|
||
# Excel 序列日期
|
||
try:
|
||
ts = pd.to_datetime(val, unit="D", origin="1899-12-30", errors="coerce")
|
||
if pd.isna(ts):
|
||
return str(val)
|
||
return ts.strftime("%Y-%m-%d %H:%M:%S")
|
||
except Exception:
|
||
pass
|
||
s = str(val).strip()
|
||
if not s or s.lower() in ("nan", "none", "-"):
|
||
return s if s == "-" else ""
|
||
ts = pd.to_datetime(s, errors="coerce")
|
||
if pd.isna(ts):
|
||
# 常见中文/斜杠格式再试
|
||
for pat in (r"(\d{4})[年/-](\d{1,2})[月/-](\d{1,2})",):
|
||
m = re.search(pat, s)
|
||
if m:
|
||
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||
rest = s[m.end() :].strip()
|
||
h = mi = se = 0
|
||
m2 = re.search(r"(\d{1,2})\s*[::时]\s*(\d{1,2})(?:\s*[::分]\s*(\d{1,2}))?", rest)
|
||
if m2:
|
||
h, mi = int(m2.group(1)), int(m2.group(2))
|
||
se = int(m2.group(3) or 0)
|
||
try:
|
||
return datetime(y, mo, d, h, mi, se).strftime("%Y-%m-%d %H:%M:%S")
|
||
except ValueError:
|
||
pass
|
||
return s
|
||
return ts.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def fill_template(
|
||
template_path: Path,
|
||
source_path: Path,
|
||
out_path: Path,
|
||
source_sheet: str | int = 0,
|
||
repair_unknown_station: bool = True,
|
||
source_header: int = 0,
|
||
) -> None:
|
||
template_wb = load_workbook(template_path)
|
||
if "加氢订单" not in template_wb.sheetnames:
|
||
raise SystemExit("模板中未找到工作表「加氢订单」")
|
||
tpl_ws = template_wb["加氢订单"]
|
||
header = [c.value for c in next(tpl_ws.iter_rows(min_row=1, max_row=1))]
|
||
if not header or any(x is None for x in header):
|
||
raise SystemExit("模板首行表头异常")
|
||
|
||
df = pd.read_excel(source_path, sheet_name=source_sheet, header=source_header)
|
||
col_src: dict[str, str] = {}
|
||
missing = []
|
||
for tpl_col in header:
|
||
if tpl_col is None:
|
||
continue
|
||
tpl_col = str(tpl_col).strip()
|
||
src = _pick_column(df, tpl_col)
|
||
if src is None:
|
||
missing.append(tpl_col)
|
||
else:
|
||
col_src[tpl_col] = src
|
||
required = ["加氢站名称", "订单编号", "加氢时间", "加氢量", "车牌"]
|
||
for k in required:
|
||
if k not in col_src:
|
||
raise SystemExit(
|
||
f"源表缺少可映射列: 模板「{k}」需要以下之一 {SOURCE_CANDIDATES.get(k, [])};"
|
||
f"当前源表列: {list(df.columns)}"
|
||
)
|
||
|
||
if repair_unknown_station:
|
||
st = col_src["加氢站名称"]
|
||
df = _repair_unknown_stations(
|
||
df,
|
||
station_col=st,
|
||
order_col=col_src["订单编号"],
|
||
plate_col=col_src["车牌"],
|
||
time_col=col_src["加氢时间"],
|
||
qty_col=col_src["加氢量"],
|
||
)
|
||
|
||
# 清空模板数据行(保留表头)
|
||
if tpl_ws.max_row > 1:
|
||
tpl_ws.delete_rows(2, tpl_ws.max_row - 1)
|
||
|
||
time_col = col_src["加氢时间"]
|
||
for _, row in df.iterrows():
|
||
out_row: list = []
|
||
for tpl_col in header:
|
||
if tpl_col is None:
|
||
out_row.append(None)
|
||
continue
|
||
tpl_col = str(tpl_col).strip()
|
||
if tpl_col not in col_src:
|
||
out_row.append("")
|
||
continue
|
||
src_col = col_src[tpl_col]
|
||
v = row[src_col]
|
||
if tpl_col == "加氢时间":
|
||
out_row.append(normalize_datetime_cell(v))
|
||
elif pd.isna(v):
|
||
out_row.append("")
|
||
else:
|
||
out_row.append(v)
|
||
tpl_ws.append(out_row)
|
||
|
||
template_wb.save(out_path)
|
||
print(f"已写入 {out_path},共 {len(df)} 行数据。")
|
||
if missing:
|
||
print("(以下模板列在源表中未匹配到,已留空)", ", ".join(missing))
|
||
|
||
|
||
def main() -> None:
|
||
p = argparse.ArgumentParser(description="加氢订单 -> 导入模板")
|
||
p.add_argument("--template", type=Path, required=True)
|
||
p.add_argument("--source", type=Path, required=True)
|
||
p.add_argument("--out", type=Path, required=True)
|
||
p.add_argument("--sheet", default="加氢订单", help="源 Excel 工作表名,默认「加氢订单」")
|
||
p.add_argument(
|
||
"--source-header",
|
||
type=int,
|
||
default=0,
|
||
help="源表表头所在行(pandas 的 header,0 为第一行;台账「明细台账」一般为 1)",
|
||
)
|
||
p.add_argument(
|
||
"--no-repair-unknown-station",
|
||
action="store_true",
|
||
help="关闭「同笔交易未知加氢站」回填(默认开启,用于修正源表重复行)",
|
||
)
|
||
args = p.parse_args()
|
||
sheet = args.sheet
|
||
if str(sheet).isdigit():
|
||
sheet = int(sheet)
|
||
fill_template(
|
||
args.template,
|
||
args.source,
|
||
args.out,
|
||
source_sheet=sheet,
|
||
repair_unknown_station=not args.no_repair_unknown_station,
|
||
source_header=args.source_header,
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|