Files
ONE-OS/scripts/sync_zhaoxiaofeng_to_ledger.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

337 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""赵小峰 维修明细 → 羚牛公司车辆运维成本台账(一车一条、合计入修理费、项目合并备注、去重)."""
from __future__ import annotations
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
from openpyxl import load_workbook
LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx")
JAN = Path("/Users/sylvawong/Desktop/26年维修费/赵小峰/2026年1月维修明细(1).xlsx")
FEB = Path("/Users/sylvawong/Desktop/26年维修费/赵小峰/羚牛26.2月(1).xls")
PLATE_RE = re.compile(
r"(沪[A-Z][A-Z0-9]{5,6}|"
r"浙[A-Z][A-Z0-9]{4,6}挂?|"
r"粤[A-Z][A-Z0-9]{5,7}|"
r"粤[A-Z]{2,3}\d{4,5})",
re.I,
)
def normalize_plate(raw: object) -> Optional[str]:
if raw is None or (isinstance(raw, float) and pd.isna(raw)):
return None
s = str(raw).strip().replace(" ", "")
m = PLATE_RE.search(s)
if not m:
return None
return m.group(1).upper()
def parse_date_jan(s: object) -> Optional[Tuple[int, int, int]]:
if s is None or (isinstance(s, float) and pd.isna(s)):
return None
if isinstance(s, datetime):
return s.year, s.month, s.day
t = pd.to_datetime(s, errors="coerce")
if pd.notna(t):
tt = t.to_pydatetime()
return tt.year, tt.month, tt.day
st = str(s).strip().replace("", ".")
m = re.match(r"^(\d{4})\D(\d{1,2})\D(\d{1,2})", st)
if m:
return int(m.group(1)), int(m.group(2)), int(m.group(3))
return None
def parse_date_feb(v: object) -> Optional[Tuple[int, int, int]]:
if v is None or (isinstance(v, float) and pd.isna(v)):
return None
if isinstance(v, datetime):
return v.year, v.month, v.day
t = pd.to_datetime(v, errors="coerce")
if pd.isna(t):
return None
tt = t.to_pydatetime()
return tt.year, tt.month, tt.day
def add_line_amounts(rr: pd.Series, c_amt_a: int, c_amt_b: int) -> float:
s = 0.0
for c in (c_amt_a, c_amt_b):
if c >= len(rr):
continue
v = rr[c]
if pd.notna(v) and isinstance(v, (int, float)):
s += float(v)
return s
@dataclass
class Row:
y: int
m: int
d: int
plate: str
fee: float
remark: str
def key(self) -> Tuple:
fee = round(float(self.fee), 2)
rmk = re.sub(r"\s+", "", self.remark or "")
return (self.y, self.m, self.d, self.plate, fee, rmk)
def parse_january_xlsx(path: Path) -> List[Row]:
"""维修结算汇总明细表:按序号分组;修理费取「总计」列;备注合并项目列。"""
df = pd.read_excel(path, sheet_name=0, header=None)
if df.shape[1] < 14:
return []
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
if "维修结算汇总明细表" not in c00:
return []
# 金额(含税)在列 7、11与赵小峰表结构一致
c_amt_a, c_amt_b = 7, 11
c_total = 13
out: List[Row] = []
i = 3
while i < len(df):
r = df.iloc[i]
try:
int(float(r[0]))
has_seq = True
except (TypeError, ValueError):
has_seq = False
if not has_seq:
i += 1
continue
if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip() == "合计":
break
ymd = parse_date_jan(r[1])
plate = normalize_plate(r[2])
if not ymd or not plate:
i += 1
continue
parts: List[str] = []
fee_from_total: Optional[float] = None
if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip():
parts.append(str(r[3]).strip())
tv = r[c_total] if c_total < len(r) else None
if pd.notna(tv):
try:
fee_from_total = float(tv)
except (TypeError, ValueError):
pass
sum_lines = add_line_amounts(r, c_amt_a, c_amt_b)
anchor = plate
i += 1
while i < len(df):
r2 = df.iloc[i]
if len(r2) > 3 and pd.notna(r2[3]) and str(r2[3]).strip() == "合计":
i += 1
break
try:
int(float(r2[0]))
break
except (TypeError, ValueError):
pass
p2 = normalize_plate(r2[2]) if len(r2) > 2 else None
if p2 is not None and p2 != anchor:
break
if len(r2) > 3 and pd.notna(r2[3]) and str(r2[3]).strip():
parts.append(str(r2[3]).strip())
if fee_from_total is None and c_total < len(r2) and pd.notna(r2[c_total]):
try:
fee_from_total = float(r2[c_total])
except (TypeError, ValueError):
pass
sum_lines += add_line_amounts(r2, c_amt_a, c_amt_b)
i += 1
fee_total = fee_from_total if fee_from_total is not None else sum_lines
remark = "".join(parts) if parts else "维修"
y, mo, d = ymd
out.append(Row(y, mo, d, anchor, float(fee_total or 0), remark))
return consolidate_by_plate(out)
def parse_february_xls(path: Path) -> List[Row]:
df = pd.read_excel(path, sheet_name=0, header=None)
out: List[Row] = []
i = 5
while i < len(df):
c0 = df.iloc[i, 0]
if pd.isna(c0):
i += 1
continue
s0 = str(c0).strip()
if s0 in ("车牌", "Page 2 of 2") or "车主" in s0 or "公司" in s0 and "合计" in str(df.iloc[i, 5]):
i += 1
continue
plate = normalize_plate(s0) or (s0 if re.match(r"^[浙沪粤]", s0) else None)
if not plate:
i += 1
continue
if len(s0) > 15:
i += 1
continue
try:
fee = float(df.iloc[i, 7])
except (TypeError, ValueError, IndexError):
fee = 0.0
i += 1
projects: List[str] = []
dates: List[Tuple[int, int, int]] = []
while i < len(df):
r = df.iloc[i]
n0 = r[0]
if pd.notna(n0):
s = str(n0).strip()
if "Page" in s or "打印" in s:
i += 1
break
if normalize_plate(s) and s not in ("车牌",):
break
if pd.notna(r[3]):
proj = str(r[3]).strip()
if proj and proj not in ("项目名称", "NaN"):
projects.append(proj)
dt = parse_date_feb(r[1])
if dt:
dates.append(dt)
i += 1
if not dates:
ymd = (2026, 2, 1)
else:
ymd = min(dates)
y, mo, d = ymd
remark = "".join(projects) if projects else "维修"
out.append(Row(y, mo, d, plate.upper(), fee, remark))
return consolidate_by_plate(out)
def consolidate_by_plate(rows: List[Row]) -> List[Row]:
"""同一文件内同一车牌合并为一条:金额相加、备注去重拼接、日期取最早。"""
m: Dict[str, Row] = {}
order: List[str] = []
for r in rows:
if r.plate not in m:
m[r.plate] = Row(r.y, r.m, r.d, r.plate, r.fee, r.remark)
order.append(r.plate)
else:
cur = m[r.plate]
d1 = datetime(cur.y, cur.m, cur.d)
d2 = datetime(r.y, r.m, r.d)
y, mo, d = (cur.y, cur.m, cur.d) if d1 <= d2 else (r.y, r.m, r.d)
fee = cur.fee + r.fee
rks = []
for part in (cur.remark, r.remark):
for x in part.split(""):
x = x.strip()
if x and x not in rks:
rks.append(x)
remark = "".join(rks)
m[r.plate] = Row(y, mo, d, r.plate, fee, remark)
return [m[p] for p in order]
def read_ledger() -> Tuple[int, List[Row]]:
wb = load_workbook(LEDGER)
ws = wb.active
header_row = None
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1):
if row and any(c == "年份" for c in row if c is not None):
header_row = i
break
if header_row is None:
raise SystemExit("未找到表头")
col_map = {}
for cell in ws[header_row]:
if cell.value:
col_map[str(cell.value).strip()] = cell.column
rows: List[Row] = []
for r in range(header_row + 1, ws.max_row + 1):
plate = ws.cell(row=r, column=col_map["车牌号"]).value
if plate is None or str(plate).strip() == "":
continue
try:
yy = int(ws.cell(row=r, column=col_map["年份"]).value)
mm = int(ws.cell(row=r, column=col_map["月份"]).value)
dd = int(ws.cell(row=r, column=col_map["日期"]).value)
fee = float(ws.cell(row=r, column=col_map["修理费"]).value or 0)
except (TypeError, ValueError):
continue
remark = ws.cell(row=r, column=col_map["备注"]).value
remark = "" if remark is None else str(remark)
rows.append(Row(yy, mm, dd, str(plate).strip().upper(), fee, remark))
return header_row, rows
def write_ledger(header_row: int, rows: List[Row]) -> None:
wb = load_workbook(LEDGER)
ws = wb.active
col_map = {}
for cell in ws[header_row]:
if cell.value:
col_map[str(cell.value).strip()] = cell.column
if ws.max_row > header_row:
ws.delete_rows(header_row + 1, ws.max_row - header_row)
r = header_row + 1
for row in rows:
ws.cell(row=r, column=col_map["年份"], value=row.y)
ws.cell(row=r, column=col_map["月份"], value=row.m)
ws.cell(row=r, column=col_map["日期"], value=row.d)
ws.cell(row=r, column=col_map["车牌号"], value=row.plate)
ws.cell(row=r, column=col_map["修理费"], value=row.fee)
for k in ("保养费", "年审费", "轮胎费", "其他"):
if k in col_map:
ws.cell(row=r, column=col_map[k], value=None)
ws.cell(row=r, column=col_map["小计"], value=row.fee)
if "费用是否为公司承担" in col_map:
ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None)
ws.cell(row=r, column=col_map["备注"], value=row.remark or None)
r += 1
wb.save(LEDGER)
def main() -> None:
imported: List[Row] = []
if JAN.is_file():
j = parse_january_xlsx(JAN)
print("1月明细:", len(j), "条(按车合并后)")
imported.extend(j)
else:
print("missing", JAN)
if FEB.is_file():
f = parse_february_xls(FEB)
print("2月结算单:", len(f), "条(按车合并后)")
imported.extend(f)
else:
print("missing", FEB)
header_row, existing = read_ledger()
merged = existing + imported
before = len(merged)
seen = set()
deduped: List[Row] = []
for row in merged:
k = row.key()
if k in seen:
continue
seen.add(k)
deduped.append(row)
deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark))
write_ledger(header_row, deduped)
print("原台账", len(existing), "合并前", before, "去重后", len(deduped), "剔除重复", before - len(deduped))
if __name__ == "__main__":
main()