#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Import 赵伟军 batch Excel files into 羚牛公司车辆运维成本台账.xlsx and dedupe.""" from __future__ import annotations import re from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Iterable, List, Optional, Tuple import pandas as pd from openpyxl import load_workbook LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx") SOURCES: List[Path] = [ Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/26-1氢能源点检明细及照片(1).xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/26-2氢能源点检明细及照片(1).xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-2服务站提供的明细表-羚牛(1).xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-2故障车辆维修信息反馈单-羚牛.xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3车辆润滑保养结算申请单羚牛(1).xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3打黄油明细(1).xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3服务站提供的明细表-羚牛(1).xlsx"), Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3故障车辆维修信息反馈单-羚牛.xlsx"), ] PLATE_RE = re.compile( r"(沪[A-Z][A-Z0-9]{5,6}|" r"浙[A-Z][A-Z0-9]{5,6}|" r"粤[A-Z][A-Z0-9]{5,7})", re.I, ) def normalize_plate(raw: object) -> Optional[str]: if raw is None or (isinstance(raw, float) and pd.isna(raw)): return None s = str(raw).strip().replace(" ", "").replace("'", "’").replace("`", "") m = PLATE_RE.search(s) if not m: return None return m.group(1).upper().replace(" ", "") def excel_date_to_ymd(v: object) -> Optional[Tuple[int, int, int]]: if v is None or (isinstance(v, float) and pd.isna(v)): return None if isinstance(v, datetime): return v.year, v.month, v.day if isinstance(v, pd.Timestamp): t = v.to_pydatetime() return t.year, t.month, t.day try: n = float(v) except (TypeError, ValueError): t = pd.to_datetime(v, errors="coerce") if pd.isna(t): return None tt = t.to_pydatetime() return tt.year, tt.month, tt.day base = datetime(1899, 12, 30) dt = base + timedelta(days=int(n)) return dt.year, dt.month, dt.day @dataclass class Row: y: int m: int d: int plate: str fee: float remark: str def key(self) -> Tuple: fee = round(float(self.fee), 2) rmk = re.sub(r"\s+", "", self.remark or "") return (self.y, self.m, self.d, self.plate, fee, rmk) def parse_dianjian_sheet(path: Path, df: pd.DataFrame, tag: str) -> List[Row]: """点检结算汇总明细表:进修日期 + 车牌 + 维修项目/点检结果 + 合计(末列).""" out: List[Row] = [] if df.shape[1] < 14: return out for i in range(3, len(df)): r = df.iloc[i] try: int(float(r[0])) except (TypeError, ValueError): continue plate = normalize_plate(r[2]) if not plate: continue ymd = excel_date_to_ymd(r[1]) if not ymd: continue remark = r[4] if pd.notna(r[4]) else "" remark = str(remark).strip() or "氢能源点检" fee_v = r[13] try: fee = float(fee_v) if pd.notna(fee_v) else 0.0 except (TypeError, ValueError): fee = 0.0 y, mo, d = ymd out.append(Row(y, mo, d, plate, fee, remark)) return out def parse_dianjian_workbook(path: Path) -> List[Row]: df = pd.read_excel(path, sheet_name="点检明细", header=None) c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else "" if "点检结算汇总明细表" not in c00: return [] return parse_dianjian_sheet(path, df, path.name) def _amount_cols_for_huizong(df: pd.DataFrame) -> Tuple[int, int]: """服务站版:两笔「金额」在列 8、12;打黄油等窄表:在列 7、11。""" hdr = df.iloc[1].tolist() if len(df) > 1 else [] flat = " ".join(str(x) for x in hdr if pd.notna(x)) if "车辆所属公司" in flat: return 8, 12 return 7, 11 def parse_weixiu_huizong_sheet(df: pd.DataFrame, tag: str) -> List[Row]: """维修结算汇总明细表(服务站/打黄油):按序号分组,汇总两栏「金额(含税)」.""" out: List[Row] = [] if df.shape[1] < 13: return out c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else "" if "维修结算汇总明细表" not in c00: return [] c8, c12 = _amount_cols_for_huizong(df) def add_amounts(rr) -> float: s = 0.0 for c in (c8, c12): if c >= len(rr): continue v = rr[c] if pd.notna(v) and isinstance(v, (int, float)): s += float(v) return s i = 3 while i < len(df): r = df.iloc[i] try: int(float(r[0])) has_seq = True except (TypeError, ValueError): has_seq = False if not has_seq: # 无序号行:若带独立车牌,则按单车一行入账(批量润滑等) p = normalize_plate(r[2]) if len(r) > 2 else None if p and len(r) > 4: ymd = excel_date_to_ymd(r[1]) if ymd: total = add_amounts(r) remark = str(r[4]).strip() if pd.notna(r[4]) else "维修" y, mo, d = ymd out.append(Row(y, mo, d, p, total, remark)) i += 1 continue ymd = excel_date_to_ymd(r[1]) plate = normalize_plate(r[2]) if not ymd or not plate: i += 1 continue parts: List[str] = [] total = 0.0 if c8 == 8: if len(r) > 4 and pd.notna(r[4]) and str(r[4]).strip(): parts.append(str(r[4]).strip()) else: if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip(): parts.append(str(r[3]).strip()) total += add_amounts(r) anchor = plate i += 1 while i < len(df): r2 = df.iloc[i] try: int(float(r2[0])) break except (TypeError, ValueError): pass p2 = normalize_plate(r2[2]) if len(r2) > 2 else None if p2 is not None and p2 != anchor: break if len(r2) > 4 and pd.notna(r2[4]) and str(r2[4]).strip(): parts.append(str(r2[4]).strip()) total += add_amounts(r2) i += 1 remark = ";".join(parts) if parts else "维修" y, mo, d = ymd out.append(Row(y, mo, d, anchor, total, remark)) return out def parse_weixiu_workbook(path: Path) -> List[Row]: out: List[Row] = [] xl = pd.ExcelFile(path) for sn in xl.sheet_names: if sn.startswith("Sheet") and sn in ("Sheet2", "Sheet3"): pass try: df = pd.read_excel(path, sheet_name=sn, header=None) except Exception: continue out.extend(parse_weixiu_huizong_sheet(df, f"{path.name}:{sn}")) return out def parse_dahuangyou_only(path: Path) -> List[Row]: df = pd.read_excel(path, sheet_name="Sheet1", header=None) return parse_weixiu_huizong_sheet(df, path.name) def parse_zhaowei_fault_df(df: pd.DataFrame) -> Optional[Row]: if df.shape[0] < 10: return None c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else "" if "故障车辆维修信息反馈单" not in c00: return None ymd = excel_date_to_ymd(df.iloc[1, 0]) if not ymd: return None plate = None symptom = "" for i in range(min(25, len(df))): v = df.iloc[i, 0] if pd.isna(v): continue s = str(v) if "车牌号:" in s or "车牌号:" in s: m = re.search(r"车牌号[::]\s*([^\s]+)", s) if m: plate = normalize_plate(m.group(1)) if "故障现象:" in s or "故障现象:" in s: symptom = re.split(r"故障现象[::]", s, maxsplit=1)[-1].strip() total: Optional[float] = None for i in range(len(df)): for j in range(min(6, df.shape[1])): cell = df.iloc[i, j] if pd.isna(cell): continue st = str(cell) if "费用共计" in st: m = re.search(r"费用共计[::]\s*([\d.]+)", st) if m: total = float(m.group(1)) break if total is not None: break projects: List[str] = [] header_at = None for i in range(len(df)): c0 = df.iloc[i, 0] c1 = df.iloc[i, 1] if df.shape[1] > 1 else None if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1): header_at = i break if header_at is not None: for j in range(header_at + 1, len(df)): c0 = df.iloc[j, 0] try: int(float(c0)) except (TypeError, ValueError): continue proj = df.iloc[j, 1] if df.shape[1] > 1 else None if pd.notna(proj) and str(proj).strip(): projects.append(str(proj).strip()) if plate is None or total is None: return None remark = ";".join(projects) if projects else symptom if not remark: remark = "故障维修" y, mo, d = ymd return Row(y, mo, d, plate, float(total), remark) def parse_fault_workbook(path: Path) -> List[Row]: out: List[Row] = [] xl = pd.ExcelFile(path) for sn in xl.sheet_names: try: df = pd.read_excel(path, sheet_name=sn, header=None) except Exception: continue rec = parse_zhaowei_fault_df(df) if rec: out.append(rec) return out def parse_lubrication_front_axle(path: Path) -> List[Row]: """仅解析「前 轮 保养」:主表车牌 + 附件列中的车牌,70 元/台。""" out: List[Row] = [] try: df = pd.read_excel(path, sheet_name="前 轮 保养", header=None) except Exception: return out c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else "" if "车辆维修保养结算申请单" not in c00: return out ymd = excel_date_to_ymd(df.iloc[1, 0]) if not ymd: return out fee = 70.0 remark = "润滑保养(打黄油)" uniq: List[str] = [] seen = set() for i in range(len(df)): if df.shape[1] < 3: break v = df.iloc[i, 2] if pd.isna(v): continue s = str(v) if "车牌号:" in s: m = re.search(r"车牌号[::]\s*([^\s(]+)", s) if m: s = m.group(1) p = normalize_plate(s) if p and p not in seen: seen.add(p) uniq.append(p) y, mo, d = ymd for p in uniq: out.append(Row(y, mo, d, p, fee, remark)) return out def classify_file(path: Path) -> List[Row]: name = path.name if "氢能源点检" in name and "点检明细" in pd.ExcelFile(path).sheet_names: return parse_dianjian_workbook(path) if "打黄油明细" in name: return parse_dahuangyou_only(path) if "润滑保养结算申请单" in name: return parse_lubrication_front_axle(path) if "服务站提供的明细表" in name: return parse_weixiu_workbook(path) if "故障车辆维修信息反馈单" in name: return parse_fault_workbook(path) return [] def _drop_corrupt_lubrication_rows(rows: List[Row]) -> List[Row]: """剔除异常合并行(多段「润滑保养」串在一条且金额畸高)。""" out: List[Row] = [] for r in rows: rm = r.remark or "" if rm.count("润滑保养") > 2 and r.fee > 500: continue out.append(r) return out def read_ledger_rows() -> Tuple[int, List[Row]]: wb = load_workbook(LEDGER) ws = wb.active header_row = None for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1): if row and any(c == "年份" for c in row if c is not None): header_row = i break if header_row is None: raise SystemExit("ledger: 未找到表头") col_map = {} for cell in ws[header_row]: if cell.value: col_map[str(cell.value).strip()] = cell.column existing: List[Row] = [] for r in range(header_row + 1, ws.max_row + 1): plate = ws.cell(row=r, column=col_map["车牌号"]).value if plate is None or str(plate).strip() == "": continue try: yy = int(ws.cell(row=r, column=col_map["年份"]).value) mm = int(ws.cell(row=r, column=col_map["月份"]).value) dd = int(ws.cell(row=r, column=col_map["日期"]).value) fee = float(ws.cell(row=r, column=col_map["修理费"]).value or 0) except (TypeError, ValueError): continue remark = ws.cell(row=r, column=col_map["备注"]).value remark = "" if remark is None else str(remark) existing.append(Row(yy, mm, dd, str(plate).strip().upper(), fee, remark)) existing = _drop_corrupt_lubrication_rows(existing) return header_row, existing def write_ledger(header_row: int, rows: List[Row]) -> None: wb = load_workbook(LEDGER) ws = wb.active col_map = {} for cell in ws[header_row]: if cell.value: col_map[str(cell.value).strip()] = cell.column if ws.max_row > header_row: ws.delete_rows(header_row + 1, ws.max_row - header_row) r = header_row + 1 for row in rows: ws.cell(row=r, column=col_map["年份"], value=row.y) ws.cell(row=r, column=col_map["月份"], value=row.m) ws.cell(row=r, column=col_map["日期"], value=row.d) ws.cell(row=r, column=col_map["车牌号"], value=row.plate) ws.cell(row=r, column=col_map["修理费"], value=row.fee) for k in ("保养费", "年审费", "轮胎费", "其他"): if k in col_map: ws.cell(row=r, column=col_map[k], value=None) ws.cell(row=r, column=col_map["小计"], value=row.fee) if "费用是否为公司承担" in col_map: ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None) ws.cell(row=r, column=col_map["备注"], value=row.remark or None) r += 1 wb.save(LEDGER) def main() -> None: imported: List[Row] = [] for p in SOURCES: if not p.is_file(): print("missing:", p) continue rows = classify_file(p) print(f"{p.name}: +{len(rows)}") imported.extend(rows) header_row, existing = read_ledger_rows() merged = existing + imported before = len(merged) seen = set() deduped: List[Row] = [] for row in merged: k = row.key() if k in seen: continue seen.add(k) deduped.append(row) deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark)) write_ledger(header_row, deduped) print("---") print("existing ledger:", len(existing)) print("imported:", len(imported)) print("merged before dedupe:", before) print("after dedupe:", len(deduped)) print("removed duplicates:", before - len(deduped)) if __name__ == "__main__": main()