Files
ONE-OS/scripts/sync_zhaowei_batch_to_ledger.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

463 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Import 赵伟军 batch Excel files into 羚牛公司车辆运维成本台账.xlsx and dedupe."""
from __future__ import annotations
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
import pandas as pd
from openpyxl import load_workbook
LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx")
SOURCES: List[Path] = [
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/26-1氢能源点检明细及照片(1).xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/26-2氢能源点检明细及照片(1).xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-2服务站提供的明细表-羚牛(1).xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-2故障车辆维修信息反馈单-羚牛.xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3车辆润滑保养结算申请单羚牛(1).xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3打黄油明细(1).xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3服务站提供的明细表-羚牛(1).xlsx"),
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3故障车辆维修信息反馈单-羚牛.xlsx"),
]
PLATE_RE = re.compile(
r"(沪[A-Z][A-Z0-9]{5,6}|"
r"浙[A-Z][A-Z0-9]{5,6}|"
r"粤[A-Z][A-Z0-9]{5,7})",
re.I,
)
def normalize_plate(raw: object) -> Optional[str]:
if raw is None or (isinstance(raw, float) and pd.isna(raw)):
return None
s = str(raw).strip().replace(" ", "").replace("'", "").replace("`", "")
m = PLATE_RE.search(s)
if not m:
return None
return m.group(1).upper().replace(" ", "")
def excel_date_to_ymd(v: object) -> Optional[Tuple[int, int, int]]:
if v is None or (isinstance(v, float) and pd.isna(v)):
return None
if isinstance(v, datetime):
return v.year, v.month, v.day
if isinstance(v, pd.Timestamp):
t = v.to_pydatetime()
return t.year, t.month, t.day
try:
n = float(v)
except (TypeError, ValueError):
t = pd.to_datetime(v, errors="coerce")
if pd.isna(t):
return None
tt = t.to_pydatetime()
return tt.year, tt.month, tt.day
base = datetime(1899, 12, 30)
dt = base + timedelta(days=int(n))
return dt.year, dt.month, dt.day
@dataclass
class Row:
y: int
m: int
d: int
plate: str
fee: float
remark: str
def key(self) -> Tuple:
fee = round(float(self.fee), 2)
rmk = re.sub(r"\s+", "", self.remark or "")
return (self.y, self.m, self.d, self.plate, fee, rmk)
def parse_dianjian_sheet(path: Path, df: pd.DataFrame, tag: str) -> List[Row]:
"""点检结算汇总明细表:进修日期 + 车牌 + 维修项目/点检结果 + 合计(末列)."""
out: List[Row] = []
if df.shape[1] < 14:
return out
for i in range(3, len(df)):
r = df.iloc[i]
try:
int(float(r[0]))
except (TypeError, ValueError):
continue
plate = normalize_plate(r[2])
if not plate:
continue
ymd = excel_date_to_ymd(r[1])
if not ymd:
continue
remark = r[4] if pd.notna(r[4]) else ""
remark = str(remark).strip() or "氢能源点检"
fee_v = r[13]
try:
fee = float(fee_v) if pd.notna(fee_v) else 0.0
except (TypeError, ValueError):
fee = 0.0
y, mo, d = ymd
out.append(Row(y, mo, d, plate, fee, remark))
return out
def parse_dianjian_workbook(path: Path) -> List[Row]:
df = pd.read_excel(path, sheet_name="点检明细", header=None)
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
if "点检结算汇总明细表" not in c00:
return []
return parse_dianjian_sheet(path, df, path.name)
def _amount_cols_for_huizong(df: pd.DataFrame) -> Tuple[int, int]:
"""服务站版:两笔「金额」在列 8、12打黄油等窄表在列 7、11。"""
hdr = df.iloc[1].tolist() if len(df) > 1 else []
flat = " ".join(str(x) for x in hdr if pd.notna(x))
if "车辆所属公司" in flat:
return 8, 12
return 7, 11
def parse_weixiu_huizong_sheet(df: pd.DataFrame, tag: str) -> List[Row]:
"""维修结算汇总明细表(服务站/打黄油):按序号分组,汇总两栏「金额(含税)」."""
out: List[Row] = []
if df.shape[1] < 13:
return out
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
if "维修结算汇总明细表" not in c00:
return []
c8, c12 = _amount_cols_for_huizong(df)
def add_amounts(rr) -> float:
s = 0.0
for c in (c8, c12):
if c >= len(rr):
continue
v = rr[c]
if pd.notna(v) and isinstance(v, (int, float)):
s += float(v)
return s
i = 3
while i < len(df):
r = df.iloc[i]
try:
int(float(r[0]))
has_seq = True
except (TypeError, ValueError):
has_seq = False
if not has_seq:
# 无序号行:若带独立车牌,则按单车一行入账(批量润滑等)
p = normalize_plate(r[2]) if len(r) > 2 else None
if p and len(r) > 4:
ymd = excel_date_to_ymd(r[1])
if ymd:
total = add_amounts(r)
remark = str(r[4]).strip() if pd.notna(r[4]) else "维修"
y, mo, d = ymd
out.append(Row(y, mo, d, p, total, remark))
i += 1
continue
ymd = excel_date_to_ymd(r[1])
plate = normalize_plate(r[2])
if not ymd or not plate:
i += 1
continue
parts: List[str] = []
total = 0.0
if c8 == 8:
if len(r) > 4 and pd.notna(r[4]) and str(r[4]).strip():
parts.append(str(r[4]).strip())
else:
if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip():
parts.append(str(r[3]).strip())
total += add_amounts(r)
anchor = plate
i += 1
while i < len(df):
r2 = df.iloc[i]
try:
int(float(r2[0]))
break
except (TypeError, ValueError):
pass
p2 = normalize_plate(r2[2]) if len(r2) > 2 else None
if p2 is not None and p2 != anchor:
break
if len(r2) > 4 and pd.notna(r2[4]) and str(r2[4]).strip():
parts.append(str(r2[4]).strip())
total += add_amounts(r2)
i += 1
remark = "".join(parts) if parts else "维修"
y, mo, d = ymd
out.append(Row(y, mo, d, anchor, total, remark))
return out
def parse_weixiu_workbook(path: Path) -> List[Row]:
out: List[Row] = []
xl = pd.ExcelFile(path)
for sn in xl.sheet_names:
if sn.startswith("Sheet") and sn in ("Sheet2", "Sheet3"):
pass
try:
df = pd.read_excel(path, sheet_name=sn, header=None)
except Exception:
continue
out.extend(parse_weixiu_huizong_sheet(df, f"{path.name}:{sn}"))
return out
def parse_dahuangyou_only(path: Path) -> List[Row]:
df = pd.read_excel(path, sheet_name="Sheet1", header=None)
return parse_weixiu_huizong_sheet(df, path.name)
def parse_zhaowei_fault_df(df: pd.DataFrame) -> Optional[Row]:
if df.shape[0] < 10:
return None
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
if "故障车辆维修信息反馈单" not in c00:
return None
ymd = excel_date_to_ymd(df.iloc[1, 0])
if not ymd:
return None
plate = None
symptom = ""
for i in range(min(25, len(df))):
v = df.iloc[i, 0]
if pd.isna(v):
continue
s = str(v)
if "车牌号:" in s or "车牌号:" in s:
m = re.search(r"车牌号[:]\s*([^\s]+)", s)
if m:
plate = normalize_plate(m.group(1))
if "故障现象:" in s or "故障现象:" in s:
symptom = re.split(r"故障现象[:]", s, maxsplit=1)[-1].strip()
total: Optional[float] = None
for i in range(len(df)):
for j in range(min(6, df.shape[1])):
cell = df.iloc[i, j]
if pd.isna(cell):
continue
st = str(cell)
if "费用共计" in st:
m = re.search(r"费用共计[:]\s*([\d.]+)", st)
if m:
total = float(m.group(1))
break
if total is not None:
break
projects: List[str] = []
header_at = None
for i in range(len(df)):
c0 = df.iloc[i, 0]
c1 = df.iloc[i, 1] if df.shape[1] > 1 else None
if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1):
header_at = i
break
if header_at is not None:
for j in range(header_at + 1, len(df)):
c0 = df.iloc[j, 0]
try:
int(float(c0))
except (TypeError, ValueError):
continue
proj = df.iloc[j, 1] if df.shape[1] > 1 else None
if pd.notna(proj) and str(proj).strip():
projects.append(str(proj).strip())
if plate is None or total is None:
return None
remark = "".join(projects) if projects else symptom
if not remark:
remark = "故障维修"
y, mo, d = ymd
return Row(y, mo, d, plate, float(total), remark)
def parse_fault_workbook(path: Path) -> List[Row]:
out: List[Row] = []
xl = pd.ExcelFile(path)
for sn in xl.sheet_names:
try:
df = pd.read_excel(path, sheet_name=sn, header=None)
except Exception:
continue
rec = parse_zhaowei_fault_df(df)
if rec:
out.append(rec)
return out
def parse_lubrication_front_axle(path: Path) -> List[Row]:
"""仅解析「前 轮 保养」:主表车牌 + 附件列中的车牌70 元/台。"""
out: List[Row] = []
try:
df = pd.read_excel(path, sheet_name="前 轮 保养", header=None)
except Exception:
return out
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
if "车辆维修保养结算申请单" not in c00:
return out
ymd = excel_date_to_ymd(df.iloc[1, 0])
if not ymd:
return out
fee = 70.0
remark = "润滑保养(打黄油)"
uniq: List[str] = []
seen = set()
for i in range(len(df)):
if df.shape[1] < 3:
break
v = df.iloc[i, 2]
if pd.isna(v):
continue
s = str(v)
if "车牌号:" in s:
m = re.search(r"车牌号[:]\s*([^\s(]+)", s)
if m:
s = m.group(1)
p = normalize_plate(s)
if p and p not in seen:
seen.add(p)
uniq.append(p)
y, mo, d = ymd
for p in uniq:
out.append(Row(y, mo, d, p, fee, remark))
return out
def classify_file(path: Path) -> List[Row]:
name = path.name
if "氢能源点检" in name and "点检明细" in pd.ExcelFile(path).sheet_names:
return parse_dianjian_workbook(path)
if "打黄油明细" in name:
return parse_dahuangyou_only(path)
if "润滑保养结算申请单" in name:
return parse_lubrication_front_axle(path)
if "服务站提供的明细表" in name:
return parse_weixiu_workbook(path)
if "故障车辆维修信息反馈单" in name:
return parse_fault_workbook(path)
return []
def _drop_corrupt_lubrication_rows(rows: List[Row]) -> List[Row]:
"""剔除异常合并行(多段「润滑保养」串在一条且金额畸高)。"""
out: List[Row] = []
for r in rows:
rm = r.remark or ""
if rm.count("润滑保养") > 2 and r.fee > 500:
continue
out.append(r)
return out
def read_ledger_rows() -> Tuple[int, List[Row]]:
wb = load_workbook(LEDGER)
ws = wb.active
header_row = None
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1):
if row and any(c == "年份" for c in row if c is not None):
header_row = i
break
if header_row is None:
raise SystemExit("ledger: 未找到表头")
col_map = {}
for cell in ws[header_row]:
if cell.value:
col_map[str(cell.value).strip()] = cell.column
existing: List[Row] = []
for r in range(header_row + 1, ws.max_row + 1):
plate = ws.cell(row=r, column=col_map["车牌号"]).value
if plate is None or str(plate).strip() == "":
continue
try:
yy = int(ws.cell(row=r, column=col_map["年份"]).value)
mm = int(ws.cell(row=r, column=col_map["月份"]).value)
dd = int(ws.cell(row=r, column=col_map["日期"]).value)
fee = float(ws.cell(row=r, column=col_map["修理费"]).value or 0)
except (TypeError, ValueError):
continue
remark = ws.cell(row=r, column=col_map["备注"]).value
remark = "" if remark is None else str(remark)
existing.append(Row(yy, mm, dd, str(plate).strip().upper(), fee, remark))
existing = _drop_corrupt_lubrication_rows(existing)
return header_row, existing
def write_ledger(header_row: int, rows: List[Row]) -> None:
wb = load_workbook(LEDGER)
ws = wb.active
col_map = {}
for cell in ws[header_row]:
if cell.value:
col_map[str(cell.value).strip()] = cell.column
if ws.max_row > header_row:
ws.delete_rows(header_row + 1, ws.max_row - header_row)
r = header_row + 1
for row in rows:
ws.cell(row=r, column=col_map["年份"], value=row.y)
ws.cell(row=r, column=col_map["月份"], value=row.m)
ws.cell(row=r, column=col_map["日期"], value=row.d)
ws.cell(row=r, column=col_map["车牌号"], value=row.plate)
ws.cell(row=r, column=col_map["修理费"], value=row.fee)
for k in ("保养费", "年审费", "轮胎费", "其他"):
if k in col_map:
ws.cell(row=r, column=col_map[k], value=None)
ws.cell(row=r, column=col_map["小计"], value=row.fee)
if "费用是否为公司承担" in col_map:
ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None)
ws.cell(row=r, column=col_map["备注"], value=row.remark or None)
r += 1
wb.save(LEDGER)
def main() -> None:
imported: List[Row] = []
for p in SOURCES:
if not p.is_file():
print("missing:", p)
continue
rows = classify_file(p)
print(f"{p.name}: +{len(rows)}")
imported.extend(rows)
header_row, existing = read_ledger_rows()
merged = existing + imported
before = len(merged)
seen = set()
deduped: List[Row] = []
for row in merged:
k = row.key()
if k in seen:
continue
seen.add(k)
deduped.append(row)
deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark))
write_ledger(header_row, deduped)
print("---")
print("existing ledger:", len(existing))
print("imported:", len(imported))
print("merged before dedupe:", before)
print("after dedupe:", len(deduped))
print("removed duplicates:", before - len(deduped))
if __name__ == "__main__":
main()