Files
ONE-OS/scripts/sync_repair_ledger_dedupe.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

587 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""One-off: merge repair Excel sources into 羚牛公司车辆运维成本台账.xlsx and dedupe."""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable, Iterator, List, Optional, Tuple
import pandas as pd
from openpyxl import load_workbook
LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx")
ROOTS = [
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年2月常州维修明细"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年2月金华维修费"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3-4月金华(中顺维修费)"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3月常州维修费"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3月开封维修费"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年4月河南开封维修费"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年1月维修费"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年2月维修费"),
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年3月维修费"),
]
@dataclass(frozen=True)
class Row:
y: int
m: int
d: int
plate: str
fee: float
remark: str
source: str = ""
def key(self) -> Tuple:
fee = round(float(self.fee), 2)
rmk = re.sub(r"\s+", "", self.remark or "")
return (self.y, self.m, self.d, self.plate, fee, rmk)
def iter_xlsx_files(roots: Iterable[Path]) -> Iterator[Path]:
for root in roots:
if not root.is_dir():
continue
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if fn.startswith("~$"):
continue
low = fn.lower()
if low.endswith(".xlsx") or low.endswith(".xls"):
yield Path(dirpath) / fn
def parse_cn_date_yy(s: object) -> Optional[Tuple[int, int, int]]:
if s is None or (isinstance(s, float) and pd.isna(s)):
return None
t = re.sub(r"\s+", "", str(s).strip())
m = re.match(r"^(\d{2})年(\d{1,2})月(\d{1,2})日", t)
if not m:
return None
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
if y < 100:
y += 2000
return y, mo, d
def parse_cn_date_yyyy(s: object) -> Optional[Tuple[int, int, int]]:
if s is None or (isinstance(s, float) and pd.isna(s)):
return None
t = str(s).strip()
m = re.match(r"^(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日", t)
if not m:
return None
return int(m.group(1)), int(m.group(2)), int(m.group(3))
def parse_any_date_cell(s: object) -> Optional[Tuple[int, int, int]]:
if s is None or (isinstance(s, float) and pd.isna(s)):
return None
if isinstance(s, datetime):
return s.year, s.month, s.day
t = pd.to_datetime(s, errors="coerce")
if pd.notna(t):
tt = t.to_pydatetime()
return tt.year, tt.month, tt.day
x = parse_cn_date_yyyy(s)
if x:
return x
return parse_cn_date_yy(s)
def month_from_parent_dir(path: Path) -> Optional[Tuple[int, int]]:
"""Parse (year, month) from folder name like '2026年4月河南开封维修费'."""
m = re.search(r"(\d{4})年(\d{1,2})月", path.parent.name)
if m:
return int(m.group(1)), int(m.group(2))
return None
def parse_standard_feedback_df(df: pd.DataFrame) -> Optional[Tuple[int, int, int, str, float, str]]:
if df.shape[0] < 3:
return None
ymd = parse_cn_date_yy(df.iloc[1, 0])
if ymd is None or ymd[0] is None:
ymd = parse_cn_date_yyyy(df.iloc[1, 0])
if ymd is None:
return None
y, mo, d = ymd
plate = None
info_fallback = ""
for i in range(min(30, df.shape[0])):
v = df.iloc[i, 0]
if pd.isna(v):
continue
s = str(v)
if "车牌号:" in s or "车牌号:" in s:
m = re.search(r"车牌号[:]\s*([^\s]+)", s)
if m:
plate = m.group(1).strip()
if "车辆维修信息:" in s or "车辆维修信息:" in s:
info_fallback = re.split(r"车辆维修信息[:]", s, maxsplit=1)[-1].strip()
header_idx = None
for i in range(df.shape[0]):
c0 = df.iloc[i, 0]
c1 = df.iloc[i, 1] if df.shape[1] > 1 else None
if pd.isna(c0):
continue
if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1):
header_idx = i
break
projects: List[str] = []
total_fee: Optional[float] = None
if header_idx is not None:
for j in range(header_idx + 1, df.shape[0]):
c0 = df.iloc[j, 0]
s0 = "" if pd.isna(c0) else str(c0)
if "费用总计" in s0 or "费用共计" in s0:
m = re.search(r"(?:费用总计|费用共计)[:]\s*([\d.]+)", s0)
if m:
total_fee = float(m.group(1))
break
try:
float(c0)
int(float(c0))
except (TypeError, ValueError):
continue
proj = df.iloc[j, 1] if df.shape[1] > 1 else None
if pd.notna(proj):
p = str(proj).strip()
if p:
projects.append(p)
remark = "".join(projects) if projects else info_fallback
if plate is None or total_fee is None:
return None
return y, mo, d, plate, float(total_fee), remark
def parse_yuyu_feedback_df(df: pd.DataFrame) -> Optional[Tuple[int, int, int, str, float, str]]:
"""上海昱巷「故障车辆维修信息反馈单」."""
if df.shape[0] < 3:
return None
ymd = parse_cn_date_yyyy(df.iloc[1, 0])
if ymd is None:
ymd = parse_any_date_cell(df.iloc[1, 0])
if ymd is None or ymd[1] == 0 or ymd[2] == 0:
return None
y, mo, d = ymd
plate = None
symptom = ""
for i in range(min(25, df.shape[0])):
v = df.iloc[i, 0]
if pd.isna(v):
continue
s = str(v)
if "车牌号:" in s or "车牌号:" in s:
m = re.search(r"车牌号[:]\s*([^\s]+)", s)
if m:
p = m.group(1).strip()
if p:
plate = p
if "故障现象:" in s or "故障现象:" in s:
symptom = re.split(r"故障现象[:]", s, maxsplit=1)[-1].strip()
header_idx = None
for i in range(df.shape[0]):
c0 = df.iloc[i, 0]
c1 = df.iloc[i, 1] if df.shape[1] > 1 else None
if pd.isna(c0):
continue
if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1):
header_idx = i
break
projects: List[str] = []
total_fee: Optional[float] = None
if header_idx is not None:
for j in range(header_idx + 1, df.shape[0]):
c0 = df.iloc[j, 0]
c1 = df.iloc[j, 1] if df.shape[1] > 1 else None
s0 = "" if pd.isna(c0) else str(c0)
s1 = "" if pd.isna(c1) else str(c1)
if "费用共计" in s0 or "费用共计" in s1:
cell = s1 if "费用共计" in s1 else s0
m = re.search(r"费用共计[:]\s*([\d.]+)", cell)
if m:
total_fee = float(m.group(1))
break
try:
float(c0)
int(float(c0))
except (TypeError, ValueError):
continue
if pd.notna(c1):
p = str(c1).strip()
if p:
projects.append(p)
remark = "".join(projects) if projects else symptom
if plate is None or total_fee is None:
return None
return y, mo, d, plate, float(total_fee), remark
def parse_mingrui_chaidui(path: Path, df: pd.DataFrame) -> List[Row]:
out: List[Row] = []
if df.shape[0] < 3:
return out
hdr_row = 1 if str(df.iloc[1, 0]).strip() == "序号" else None
if hdr_row is None:
return out
sub = pd.read_excel(path, sheet_name=0, header=hdr_row)
cols = {str(c).strip(): c for c in sub.columns}
need = ["车牌号", "时间", "维修项目", "价格"]
if not all(k in cols for k in need):
return out
for _, r in sub.iterrows():
plate = r.get("车牌号")
if pd.isna(plate) or str(plate).strip() == "":
continue
proj = r.get("维修项目")
if pd.notna(proj) and "合计" in str(proj):
continue
ymd = parse_cn_date_yy(r.get("时间"))
if ymd is None:
continue
y, mo, d = ymd
fee = r.get("价格")
try:
fee_f = float(fee) if pd.notna(fee) else 0.0
except (TypeError, ValueError):
fee_f = 0.0
remark = "" if pd.isna(proj) else str(proj).strip()
out.append(Row(y, mo, d, str(plate).strip(), fee_f, remark, path.name))
return out
def parse_jinhua_chaidui(path: Path, df: pd.DataFrame) -> List[Row]:
out: List[Row] = []
if df.shape[0] < 3:
return out
hdr = None
for i in range(min(5, df.shape[0])):
if str(df.iloc[i, 0]).strip() == "车牌号":
hdr = i
break
if hdr is None:
return out
sub = pd.read_excel(path, sheet_name=0, header=hdr)
time_col = "时间" if "时间" in sub.columns else None
proj_col = None
for c in sub.columns:
cs = str(c).strip()
if cs in ("项目", "维修项目"):
proj_col = c
break
price_col = None
for c in sub.columns:
if str(c).strip().startswith("价格"):
price_col = c
break
if time_col is None or proj_col is None or price_col is None:
return out
for _, r in sub.iterrows():
plate = r.get("车牌号")
if pd.isna(plate) or str(plate).strip() == "":
continue
ymd = parse_cn_date_yy(r.get(time_col))
if ymd is None:
continue
y, mo, d = ymd
proj = r.get(proj_col)
if pd.notna(proj) and "合计" in str(proj):
continue
try:
fee_f = float(r.get(price_col)) if pd.notna(r.get(price_col)) else 0.0
except (TypeError, ValueError):
fee_f = 0.0
remark = "" if pd.isna(proj) else str(proj).strip()
out.append(Row(y, mo, d, str(plate).strip(), fee_f, remark, path.name))
return out
def parse_kaifeng_chaidui(path: Path, df: pd.DataFrame) -> List[Row]:
out: List[Row] = []
mm = month_from_parent_dir(path)
if mm is None:
return out
y0, m0 = mm
hdr = None
for i in range(min(6, df.shape[0])):
c0 = str(df.iloc[i, 0]).strip() if pd.notna(df.iloc[i, 0]) else ""
if c0 == "序号":
hdr = i
break
if hdr is None:
return out
sub = pd.read_excel(path, sheet_name=0, header=hdr)
for _, r in sub.iterrows():
try:
idx = r.iloc[0]
if pd.isna(idx):
continue
float(idx)
except (TypeError, ValueError):
continue
plate = r.get("车牌") if "车牌" in sub.columns else r.get(sub.columns[1])
if plate is None or (isinstance(plate, float) and pd.isna(plate)):
continue
fee_col = None
for c in sub.columns:
if str(c).strip() == "价格":
fee_col = c
break
if fee_col is None:
continue
try:
fee_f = float(r.get(fee_col)) if pd.notna(r.get(fee_col)) else 0.0
except (TypeError, ValueError):
continue
if "合计" in str(plate):
continue
out.append(Row(y0, m0, 1, str(plate).strip(), fee_f, "开封拆堆", path.name))
return out
def find_col(df: pd.DataFrame, names: Tuple[str, ...]) -> Optional[str]:
for c in df.columns:
cs = str(c).strip()
if cs in names:
return c # type: ignore[return-value]
return None
def parse_system_table_df(df: pd.DataFrame, source: str) -> List[Row]:
out: List[Row] = []
if df.shape[0] < 1:
return out
plate_c = find_col(df, ("车牌号", "车牌号码"))
if plate_c is None:
return out
date_c = find_col(df, ("故障上报时间",))
sol_c = find_col(df, ("解决方案",))
part_c = find_col(df, ("配件价格", "配件", "配件费用", "配件费"))
labor_c = find_col(df, ("工时", "人工", "工时费"))
for _, r in df.iterrows():
plate = r.get(plate_c)
if plate is None or (isinstance(plate, float) and pd.isna(plate)):
continue
plate_s = str(plate).strip()
if plate_s == "" or plate_s == "NaN":
continue
sol = r.get(sol_c) if sol_c else None
if sol is not None and "合计" in str(sol):
continue
ymd = parse_any_date_cell(r.get(date_c)) if date_c else None
if ymd is None:
continue
fee = 0.0
if part_c:
fee += float(r.get(part_c) or 0) if pd.notna(r.get(part_c)) else 0.0
if labor_c:
v = r.get(labor_c)
if pd.notna(v):
try:
fee += float(v)
except (TypeError, ValueError):
pass
# skip summary rows where part column shows 合计
if part_c:
pv = r.get(part_c)
if pd.notna(pv) and "合计" in str(pv):
continue
remark = "" if sol is None or pd.isna(sol) else str(sol).strip()
y, mo, d = ymd
out.append(Row(y, mo, d, plate_s, float(fee), remark, source))
return out
def parse_system_workbook(path: Path) -> List[Row]:
out: List[Row] = []
xl = pd.ExcelFile(path)
for sn in xl.sheet_names:
try:
df = pd.read_excel(path, sheet_name=sn, header=0)
except Exception:
continue
if df.shape[0] == 0:
continue
c0 = str(df.columns[0]).strip() if df.columns[0] is not None else ""
if c0 not in ("车牌号", "车辆编号"):
continue
out.extend(parse_system_table_df(df, f"{path.name}:{sn}"))
return out
def classify_and_parse(path: Path) -> List[Row]:
rows: List[Row] = []
try:
df0 = pd.read_excel(path, sheet_name=0, header=None)
except Exception:
return rows
if df0.shape[0] == 0:
return rows
c00 = str(df0.iloc[0, 0]) if pd.notna(df0.iloc[0, 0]) else ""
# 河南开封拆堆(无日期列)
if "开封拆堆" in c00 or ("拆堆明细" in c00 and "开封" in c00):
rows.extend(parse_kaifeng_chaidui(path, df0))
return rows
# 铭瑞类拆堆
if "铭瑞" in c00 or (df0.shape[0] > 2 and str(df0.iloc[1, 0]).strip() == "序号" and "维修项目" in str(df0.iloc[1, 4])):
mr = parse_mingrui_chaidui(path, df0)
if mr:
return mr
# 金华拆堆
if "金华维修明细" in c00:
jh = parse_jinhua_chaidui(path, df0)
if jh:
return jh
# 昱巷:故障车辆维修信息反馈单(多 sheet
if "故障车辆维修信息反馈单" in c00:
xl = pd.ExcelFile(path)
for sn in xl.sheet_names:
df = pd.read_excel(path, sheet_name=sn, header=None)
rec = parse_yuyu_feedback_df(df)
if rec:
y, mo, d, plate, fee, remark = rec
rows.append(Row(y, mo, d, plate, fee, remark, f"{path.name}:{sn}"))
return rows
# 常州/开封 车辆维修信息反馈单
if "车辆维修信息反馈单" in c00:
xl = pd.ExcelFile(path)
for sn in xl.sheet_names:
df = pd.read_excel(path, sheet_name=sn, header=None)
rec = parse_standard_feedback_df(df)
if rec:
y, mo, d, plate, fee, remark = rec
rows.append(Row(y, mo, d, plate, fee, remark, f"{path.name}:{sn}"))
return rows
# 系统故障 / 系统维修 / 昱巷系统明细
c0h = str(df0.iloc[0, 0]).strip() if pd.notna(df0.iloc[0, 0]) else ""
if c0h == "车牌号" or c0h == "车辆编号":
rows.extend(parse_system_workbook(path))
return rows
# 兜底:按系统表再试(部分文件首行是合并格)
alt = parse_system_workbook(path)
if alt:
return alt
return rows
def read_existing_ledger(path: Path) -> Tuple[int, List[Row]]:
wb = load_workbook(path)
ws = wb.active
header_row = None
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1):
if row and any(c == "年份" for c in row if c is not None):
header_row = i
break
if header_row is None:
raise SystemExit("ledger: 年份 header not found")
col_map: dict = {}
for cell in ws[header_row]:
if cell.value:
col_map[str(cell.value).strip()] = cell.column
out: List[Row] = []
for r in range(header_row + 1, ws.max_row + 1):
plate = ws.cell(row=r, column=col_map["车牌号"]).value
if plate is None or str(plate).strip() == "":
continue
y = ws.cell(row=r, column=col_map["年份"]).value
mo = ws.cell(row=r, column=col_map["月份"]).value
d = ws.cell(row=r, column=col_map["日期"]).value
fee = ws.cell(row=r, column=col_map["修理费"]).value
remark = ws.cell(row=r, column=col_map["备注"]).value
try:
yy, mm, dd = int(y), int(mo), int(d)
ff = float(fee) if fee is not None else 0.0
except (TypeError, ValueError):
continue
rmk = "" if remark is None else str(remark)
out.append(Row(yy, mm, dd, str(plate).strip(), ff, rmk, "ledger:existing"))
return header_row, out
def write_ledger(path: Path, header_row: int, rows: List[Row]) -> None:
wb = load_workbook(path)
ws = wb.active
col_map: dict = {}
for cell in ws[header_row]:
if cell.value:
col_map[str(cell.value).strip()] = cell.column
if ws.max_row > header_row:
ws.delete_rows(header_row + 1, ws.max_row - header_row)
r = header_row + 1
for row in rows:
ws.cell(row=r, column=col_map["年份"], value=row.y)
ws.cell(row=r, column=col_map["月份"], value=row.m)
ws.cell(row=r, column=col_map["日期"], value=row.d)
ws.cell(row=r, column=col_map["车牌号"], value=row.plate)
ws.cell(row=r, column=col_map["修理费"], value=row.fee)
for k in ("保养费", "年审费", "轮胎费", "其他"):
if k in col_map:
ws.cell(row=r, column=col_map[k], value=None)
ws.cell(row=r, column=col_map["小计"], value=row.fee)
if "费用是否为公司承担" in col_map:
ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None)
ws.cell(row=r, column=col_map["备注"], value=row.remark or None)
r += 1
wb.save(path)
def main() -> None:
files = sorted({p.resolve() for p in iter_xlsx_files(ROOTS)})
imported: List[Row] = []
for p in files:
imported.extend(classify_and_parse(p))
header_row, existing = read_existing_ledger(LEDGER)
merged = existing + imported
before = len(merged)
seen = set()
deduped: List[Row] = []
for row in merged:
k = row.key()
if k in seen:
continue
seen.add(k)
deduped.append(row)
deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark))
write_ledger(LEDGER, header_row, deduped)
print("files scanned:", len(files))
print("rows imported from files:", len(imported))
print("existing ledger rows:", len(existing))
print("merged before dedupe:", before)
print("after dedupe:", len(deduped))
print("removed duplicates:", before - len(deduped))
if __name__ == "__main__":
main()