#!/usr/bin/env python3 # -*- coding: utf-8 -*- """One-off: merge repair Excel sources into 羚牛公司车辆运维成本台账.xlsx and dedupe.""" from __future__ import annotations import os import re from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Iterable, Iterator, List, Optional, Tuple import pandas as pd from openpyxl import load_workbook LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx") ROOTS = [ Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年2月常州维修明细"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年2月金华维修费"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3-4月金华(中顺维修费)"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3月常州维修费"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3月开封维修费"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年4月河南开封维修费"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年1月维修费"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年2月维修费"), Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年3月维修费"), ] @dataclass(frozen=True) class Row: y: int m: int d: int plate: str fee: float remark: str source: str = "" def key(self) -> Tuple: fee = round(float(self.fee), 2) rmk = re.sub(r"\s+", "", self.remark or "") return (self.y, self.m, self.d, self.plate, fee, rmk) def iter_xlsx_files(roots: Iterable[Path]) -> Iterator[Path]: for root in roots: if not root.is_dir(): continue for dirpath, _, filenames in os.walk(root): for fn in filenames: if fn.startswith("~$"): continue low = fn.lower() if low.endswith(".xlsx") or low.endswith(".xls"): yield Path(dirpath) / fn def parse_cn_date_yy(s: object) -> Optional[Tuple[int, int, int]]: if s is None or (isinstance(s, float) and pd.isna(s)): return None t = re.sub(r"\s+", "", str(s).strip()) m = re.match(r"^(\d{2})年(\d{1,2})月(\d{1,2})日", t) if not m: return None y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3)) if y < 100: y += 2000 return y, mo, d def parse_cn_date_yyyy(s: object) -> Optional[Tuple[int, int, int]]: if s is None or (isinstance(s, float) and pd.isna(s)): return None t = str(s).strip() m = re.match(r"^(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日", t) if not m: return None return int(m.group(1)), int(m.group(2)), int(m.group(3)) def parse_any_date_cell(s: object) -> Optional[Tuple[int, int, int]]: if s is None or (isinstance(s, float) and pd.isna(s)): return None if isinstance(s, datetime): return s.year, s.month, s.day t = pd.to_datetime(s, errors="coerce") if pd.notna(t): tt = t.to_pydatetime() return tt.year, tt.month, tt.day x = parse_cn_date_yyyy(s) if x: return x return parse_cn_date_yy(s) def month_from_parent_dir(path: Path) -> Optional[Tuple[int, int]]: """Parse (year, month) from folder name like '2026年4月河南开封维修费'.""" m = re.search(r"(\d{4})年(\d{1,2})月", path.parent.name) if m: return int(m.group(1)), int(m.group(2)) return None def parse_standard_feedback_df(df: pd.DataFrame) -> Optional[Tuple[int, int, int, str, float, str]]: if df.shape[0] < 3: return None ymd = parse_cn_date_yy(df.iloc[1, 0]) if ymd is None or ymd[0] is None: ymd = parse_cn_date_yyyy(df.iloc[1, 0]) if ymd is None: return None y, mo, d = ymd plate = None info_fallback = "" for i in range(min(30, df.shape[0])): v = df.iloc[i, 0] if pd.isna(v): continue s = str(v) if "车牌号:" in s or "车牌号:" in s: m = re.search(r"车牌号[::]\s*([^\s]+)", s) if m: plate = m.group(1).strip() if "车辆维修信息:" in s or "车辆维修信息:" in s: info_fallback = re.split(r"车辆维修信息[::]", s, maxsplit=1)[-1].strip() header_idx = None for i in range(df.shape[0]): c0 = df.iloc[i, 0] c1 = df.iloc[i, 1] if df.shape[1] > 1 else None if pd.isna(c0): continue if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1): header_idx = i break projects: List[str] = [] total_fee: Optional[float] = None if header_idx is not None: for j in range(header_idx + 1, df.shape[0]): c0 = df.iloc[j, 0] s0 = "" if pd.isna(c0) else str(c0) if "费用总计" in s0 or "费用共计" in s0: m = re.search(r"(?:费用总计|费用共计)[::]\s*([\d.]+)", s0) if m: total_fee = float(m.group(1)) break try: float(c0) int(float(c0)) except (TypeError, ValueError): continue proj = df.iloc[j, 1] if df.shape[1] > 1 else None if pd.notna(proj): p = str(proj).strip() if p: projects.append(p) remark = ";".join(projects) if projects else info_fallback if plate is None or total_fee is None: return None return y, mo, d, plate, float(total_fee), remark def parse_yuyu_feedback_df(df: pd.DataFrame) -> Optional[Tuple[int, int, int, str, float, str]]: """上海昱巷「故障车辆维修信息反馈单」.""" if df.shape[0] < 3: return None ymd = parse_cn_date_yyyy(df.iloc[1, 0]) if ymd is None: ymd = parse_any_date_cell(df.iloc[1, 0]) if ymd is None or ymd[1] == 0 or ymd[2] == 0: return None y, mo, d = ymd plate = None symptom = "" for i in range(min(25, df.shape[0])): v = df.iloc[i, 0] if pd.isna(v): continue s = str(v) if "车牌号:" in s or "车牌号:" in s: m = re.search(r"车牌号[::]\s*([^\s]+)", s) if m: p = m.group(1).strip() if p: plate = p if "故障现象:" in s or "故障现象:" in s: symptom = re.split(r"故障现象[::]", s, maxsplit=1)[-1].strip() header_idx = None for i in range(df.shape[0]): c0 = df.iloc[i, 0] c1 = df.iloc[i, 1] if df.shape[1] > 1 else None if pd.isna(c0): continue if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1): header_idx = i break projects: List[str] = [] total_fee: Optional[float] = None if header_idx is not None: for j in range(header_idx + 1, df.shape[0]): c0 = df.iloc[j, 0] c1 = df.iloc[j, 1] if df.shape[1] > 1 else None s0 = "" if pd.isna(c0) else str(c0) s1 = "" if pd.isna(c1) else str(c1) if "费用共计" in s0 or "费用共计" in s1: cell = s1 if "费用共计" in s1 else s0 m = re.search(r"费用共计[::]\s*([\d.]+)", cell) if m: total_fee = float(m.group(1)) break try: float(c0) int(float(c0)) except (TypeError, ValueError): continue if pd.notna(c1): p = str(c1).strip() if p: projects.append(p) remark = ";".join(projects) if projects else symptom if plate is None or total_fee is None: return None return y, mo, d, plate, float(total_fee), remark def parse_mingrui_chaidui(path: Path, df: pd.DataFrame) -> List[Row]: out: List[Row] = [] if df.shape[0] < 3: return out hdr_row = 1 if str(df.iloc[1, 0]).strip() == "序号" else None if hdr_row is None: return out sub = pd.read_excel(path, sheet_name=0, header=hdr_row) cols = {str(c).strip(): c for c in sub.columns} need = ["车牌号", "时间", "维修项目", "价格"] if not all(k in cols for k in need): return out for _, r in sub.iterrows(): plate = r.get("车牌号") if pd.isna(plate) or str(plate).strip() == "": continue proj = r.get("维修项目") if pd.notna(proj) and "合计" in str(proj): continue ymd = parse_cn_date_yy(r.get("时间")) if ymd is None: continue y, mo, d = ymd fee = r.get("价格") try: fee_f = float(fee) if pd.notna(fee) else 0.0 except (TypeError, ValueError): fee_f = 0.0 remark = "" if pd.isna(proj) else str(proj).strip() out.append(Row(y, mo, d, str(plate).strip(), fee_f, remark, path.name)) return out def parse_jinhua_chaidui(path: Path, df: pd.DataFrame) -> List[Row]: out: List[Row] = [] if df.shape[0] < 3: return out hdr = None for i in range(min(5, df.shape[0])): if str(df.iloc[i, 0]).strip() == "车牌号": hdr = i break if hdr is None: return out sub = pd.read_excel(path, sheet_name=0, header=hdr) time_col = "时间" if "时间" in sub.columns else None proj_col = None for c in sub.columns: cs = str(c).strip() if cs in ("项目", "维修项目"): proj_col = c break price_col = None for c in sub.columns: if str(c).strip().startswith("价格"): price_col = c break if time_col is None or proj_col is None or price_col is None: return out for _, r in sub.iterrows(): plate = r.get("车牌号") if pd.isna(plate) or str(plate).strip() == "": continue ymd = parse_cn_date_yy(r.get(time_col)) if ymd is None: continue y, mo, d = ymd proj = r.get(proj_col) if pd.notna(proj) and "合计" in str(proj): continue try: fee_f = float(r.get(price_col)) if pd.notna(r.get(price_col)) else 0.0 except (TypeError, ValueError): fee_f = 0.0 remark = "" if pd.isna(proj) else str(proj).strip() out.append(Row(y, mo, d, str(plate).strip(), fee_f, remark, path.name)) return out def parse_kaifeng_chaidui(path: Path, df: pd.DataFrame) -> List[Row]: out: List[Row] = [] mm = month_from_parent_dir(path) if mm is None: return out y0, m0 = mm hdr = None for i in range(min(6, df.shape[0])): c0 = str(df.iloc[i, 0]).strip() if pd.notna(df.iloc[i, 0]) else "" if c0 == "序号": hdr = i break if hdr is None: return out sub = pd.read_excel(path, sheet_name=0, header=hdr) for _, r in sub.iterrows(): try: idx = r.iloc[0] if pd.isna(idx): continue float(idx) except (TypeError, ValueError): continue plate = r.get("车牌") if "车牌" in sub.columns else r.get(sub.columns[1]) if plate is None or (isinstance(plate, float) and pd.isna(plate)): continue fee_col = None for c in sub.columns: if str(c).strip() == "价格": fee_col = c break if fee_col is None: continue try: fee_f = float(r.get(fee_col)) if pd.notna(r.get(fee_col)) else 0.0 except (TypeError, ValueError): continue if "合计" in str(plate): continue out.append(Row(y0, m0, 1, str(plate).strip(), fee_f, "开封拆堆", path.name)) return out def find_col(df: pd.DataFrame, names: Tuple[str, ...]) -> Optional[str]: for c in df.columns: cs = str(c).strip() if cs in names: return c # type: ignore[return-value] return None def parse_system_table_df(df: pd.DataFrame, source: str) -> List[Row]: out: List[Row] = [] if df.shape[0] < 1: return out plate_c = find_col(df, ("车牌号", "车牌号码")) if plate_c is None: return out date_c = find_col(df, ("故障上报时间",)) sol_c = find_col(df, ("解决方案",)) part_c = find_col(df, ("配件价格", "配件", "配件费用", "配件费")) labor_c = find_col(df, ("工时", "人工", "工时费")) for _, r in df.iterrows(): plate = r.get(plate_c) if plate is None or (isinstance(plate, float) and pd.isna(plate)): continue plate_s = str(plate).strip() if plate_s == "" or plate_s == "NaN": continue sol = r.get(sol_c) if sol_c else None if sol is not None and "合计" in str(sol): continue ymd = parse_any_date_cell(r.get(date_c)) if date_c else None if ymd is None: continue fee = 0.0 if part_c: fee += float(r.get(part_c) or 0) if pd.notna(r.get(part_c)) else 0.0 if labor_c: v = r.get(labor_c) if pd.notna(v): try: fee += float(v) except (TypeError, ValueError): pass # skip summary rows where part column shows 合计 if part_c: pv = r.get(part_c) if pd.notna(pv) and "合计" in str(pv): continue remark = "" if sol is None or pd.isna(sol) else str(sol).strip() y, mo, d = ymd out.append(Row(y, mo, d, plate_s, float(fee), remark, source)) return out def parse_system_workbook(path: Path) -> List[Row]: out: List[Row] = [] xl = pd.ExcelFile(path) for sn in xl.sheet_names: try: df = pd.read_excel(path, sheet_name=sn, header=0) except Exception: continue if df.shape[0] == 0: continue c0 = str(df.columns[0]).strip() if df.columns[0] is not None else "" if c0 not in ("车牌号", "车辆编号"): continue out.extend(parse_system_table_df(df, f"{path.name}:{sn}")) return out def classify_and_parse(path: Path) -> List[Row]: rows: List[Row] = [] try: df0 = pd.read_excel(path, sheet_name=0, header=None) except Exception: return rows if df0.shape[0] == 0: return rows c00 = str(df0.iloc[0, 0]) if pd.notna(df0.iloc[0, 0]) else "" # 河南开封拆堆(无日期列) if "开封拆堆" in c00 or ("拆堆明细" in c00 and "开封" in c00): rows.extend(parse_kaifeng_chaidui(path, df0)) return rows # 铭瑞类拆堆 if "铭瑞" in c00 or (df0.shape[0] > 2 and str(df0.iloc[1, 0]).strip() == "序号" and "维修项目" in str(df0.iloc[1, 4])): mr = parse_mingrui_chaidui(path, df0) if mr: return mr # 金华拆堆 if "金华维修明细" in c00: jh = parse_jinhua_chaidui(path, df0) if jh: return jh # 昱巷:故障车辆维修信息反馈单(多 sheet) if "故障车辆维修信息反馈单" in c00: xl = pd.ExcelFile(path) for sn in xl.sheet_names: df = pd.read_excel(path, sheet_name=sn, header=None) rec = parse_yuyu_feedback_df(df) if rec: y, mo, d, plate, fee, remark = rec rows.append(Row(y, mo, d, plate, fee, remark, f"{path.name}:{sn}")) return rows # 常州/开封 车辆维修信息反馈单 if "车辆维修信息反馈单" in c00: xl = pd.ExcelFile(path) for sn in xl.sheet_names: df = pd.read_excel(path, sheet_name=sn, header=None) rec = parse_standard_feedback_df(df) if rec: y, mo, d, plate, fee, remark = rec rows.append(Row(y, mo, d, plate, fee, remark, f"{path.name}:{sn}")) return rows # 系统故障 / 系统维修 / 昱巷系统明细 c0h = str(df0.iloc[0, 0]).strip() if pd.notna(df0.iloc[0, 0]) else "" if c0h == "车牌号" or c0h == "车辆编号": rows.extend(parse_system_workbook(path)) return rows # 兜底:按系统表再试(部分文件首行是合并格) alt = parse_system_workbook(path) if alt: return alt return rows def read_existing_ledger(path: Path) -> Tuple[int, List[Row]]: wb = load_workbook(path) ws = wb.active header_row = None for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1): if row and any(c == "年份" for c in row if c is not None): header_row = i break if header_row is None: raise SystemExit("ledger: 年份 header not found") col_map: dict = {} for cell in ws[header_row]: if cell.value: col_map[str(cell.value).strip()] = cell.column out: List[Row] = [] for r in range(header_row + 1, ws.max_row + 1): plate = ws.cell(row=r, column=col_map["车牌号"]).value if plate is None or str(plate).strip() == "": continue y = ws.cell(row=r, column=col_map["年份"]).value mo = ws.cell(row=r, column=col_map["月份"]).value d = ws.cell(row=r, column=col_map["日期"]).value fee = ws.cell(row=r, column=col_map["修理费"]).value remark = ws.cell(row=r, column=col_map["备注"]).value try: yy, mm, dd = int(y), int(mo), int(d) ff = float(fee) if fee is not None else 0.0 except (TypeError, ValueError): continue rmk = "" if remark is None else str(remark) out.append(Row(yy, mm, dd, str(plate).strip(), ff, rmk, "ledger:existing")) return header_row, out def write_ledger(path: Path, header_row: int, rows: List[Row]) -> None: wb = load_workbook(path) ws = wb.active col_map: dict = {} for cell in ws[header_row]: if cell.value: col_map[str(cell.value).strip()] = cell.column if ws.max_row > header_row: ws.delete_rows(header_row + 1, ws.max_row - header_row) r = header_row + 1 for row in rows: ws.cell(row=r, column=col_map["年份"], value=row.y) ws.cell(row=r, column=col_map["月份"], value=row.m) ws.cell(row=r, column=col_map["日期"], value=row.d) ws.cell(row=r, column=col_map["车牌号"], value=row.plate) ws.cell(row=r, column=col_map["修理费"], value=row.fee) for k in ("保养费", "年审费", "轮胎费", "其他"): if k in col_map: ws.cell(row=r, column=col_map[k], value=None) ws.cell(row=r, column=col_map["小计"], value=row.fee) if "费用是否为公司承担" in col_map: ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None) ws.cell(row=r, column=col_map["备注"], value=row.remark or None) r += 1 wb.save(path) def main() -> None: files = sorted({p.resolve() for p in iter_xlsx_files(ROOTS)}) imported: List[Row] = [] for p in files: imported.extend(classify_and_parse(p)) header_row, existing = read_existing_ledger(LEDGER) merged = existing + imported before = len(merged) seen = set() deduped: List[Row] = [] for row in merged: k = row.key() if k in seen: continue seen.add(k) deduped.append(row) deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark)) write_ledger(LEDGER, header_row, deduped) print("files scanned:", len(files)) print("rows imported from files:", len(imported)) print("existing ledger rows:", len(existing)) print("merged before dedupe:", before) print("after dedupe:", len(deduped)) print("removed duplicates:", before - len(deduped)) if __name__ == "__main__": main()