#!/usr/bin/env python3 # -*- coding: utf-8 -*- """从行驶证影像 OCR 提取检验日期,与车辆表合并导出 Excel。 有照片的车辆:先对影像做 OCR;若未识别到「检验有效期」,再用表内「行驶证检验有效期」补全第三列。 """ from __future__ import annotations import calendar import os import re import subprocess import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import date from typing import Dict, List, Optional, Tuple import fitz # pymupdf import pandas as pd from PIL import Image, ImageEnhance # 配置路径(可按需修改) EXCEL_IN = "/Users/sylvawong/Downloads/车辆信息-1776270214730.xlsx" PHOTO_DIR = "/Users/sylvawong/Desktop/证件信息梳理/行驶证" EXCEL_OUT = "/Users/sylvawong/Desktop/CURSOR/ONE-OS/车辆行驶证_OCR核对.xlsx" EXCEL_INSPECTION_COL = "行驶证检验有效期" TESSERACT = "/opt/homebrew/bin/tesseract" OCR_LANG = "chi_sim+eng" MAX_IMAGE_SIDE = 2200 MAX_WORKERS = 3 MAX_PDF_PAGES = 3 PDF_ZOOM = 2.5 def extract_plate_from_filename(basename_no_ext: str) -> Optional[str]: s = basename_no_ext s = re.sub(r"\s*\(\d+\)\s*$", "", s) if "行驶证" in s: s = s.split("行驶证")[0] s = s.rstrip("-_· ") if not s: return None parts = s.split("-") first = parts[0] if ( len(parts) >= 2 and len(parts[1]) == 17 and re.match(r"^[A-HJ-NPR-Z0-9]{17}$", parts[1], re.I) ): cand = first else: cand = first m = re.match(r"^([\u4e00-\u9fa5][A-Z0-9\u4e00-\u9fa5·]{1,14})$", cand) if not m: return None plate = m.group(1).rstrip("-_·") if len(plate) < 6: return None return plate def build_plate_files() -> Dict[str, List[str]]: mapping: Dict[str, List[str]] = {} for fn in os.listdir(PHOTO_DIR): path = os.path.join(PHOTO_DIR, fn) if not os.path.isfile(path): continue base, _ = os.path.splitext(fn) plate = extract_plate_from_filename(base) if not plate: continue mapping.setdefault(plate, []).append(path) return mapping def file_try_order(paths: List[str]) -> List[str]: def score(p: str) -> Tuple[float, str]: fn = os.path.basename(p).lower() # 先年审页(通常含检验记录),再主页,PDF 略靠后(需渲染) if "年审" in fn: tier = 0.0 elif "行驶证" in fn or "行驶" in fn: tier = 1.0 else: tier = 1.5 if fn.endswith(".pdf"): tier += 0.25 return (tier, fn) return sorted(paths, key=score) def load_image_for_ocr(path: str) -> Image.Image: img = Image.open(path).convert("RGB") w, h = img.size if max(w, h) > MAX_IMAGE_SIDE: s = MAX_IMAGE_SIDE / max(w, h) img = img.resize((int(w * s), int(h * s)), Image.LANCZOS) return img def ocr_image(img: Image.Image) -> str: gray = img.convert("L") gray = ImageEnhance.Contrast(gray).enhance(1.35) fd, tmp = tempfile.mkstemp(suffix=".png") os.close(fd) try: gray.save(tmp, format="PNG") cmd = [ TESSERACT, tmp, "stdout", "-l", OCR_LANG, "--psm", "6", "-c", "preserve_interword_spaces=1", ] r = subprocess.run( cmd, capture_output=True, text=True, timeout=180, ) if r.returncode != 0: return "" return r.stdout or "" finally: try: os.unlink(tmp) except OSError: pass def ocr_file(path: str) -> str: try: img = load_image_for_ocr(path) return ocr_image(img) except Exception: return "" def month_end(y: int, m: int) -> date: last = calendar.monthrange(y, m)[1] return date(y, m, last) def parse_inspection_date(text: str) -> Optional[date]: if not text: return None t = text.replace("O", "0").replace("o", "0") # 检验有效期至 2027年06月04日 / 2026-03-31 patterns = [ r"检验有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日", r"检验有效期至\s*[::]?\s*(\d{4})年(\d{1,2})月(\d{1,2})日", r"检验有效期至\s*[::]?\s*(\d{4})\s*[-/.]\s*(\d{1,2})\s*[-/.]\s*(\d{1,2})", r"检验有效期至\s*[::]?\s*(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})", r"检验有效期\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日", r"有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日", r"检验有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月(?!\s*\d{1,2}\s*日)", r"检验有效期至\s*[::]?\s*(\d{4})年(\d{1,2})月(?!\d{1,2}日)", r"检验有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*", r"(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*[((]?\s*检验", ] for pat in patterns: m = re.search(pat, t) if not m: continue y, mo = int(m.group(1)), int(m.group(2)) if mo < 1 or mo > 12 or y < 2000 or y > 2100: continue g3 = m.group(3) if m.lastindex and m.lastindex >= 3 else None if g3 is not None and str(g3).strip() != "": try: d = int(g3) except ValueError: continue if 1 <= d <= 31: try: return date(y, mo, d) except ValueError: continue return month_end(y, mo) return None def ocr_inspection_from_path(path: str) -> Optional[date]: ext = os.path.splitext(path)[1].lower() if ext == ".pdf": doc = fitz.open(path) try: n = min(len(doc), MAX_PDF_PAGES) for i in range(n): page = doc.load_page(i) mat = fitz.Matrix(PDF_ZOOM, PDF_ZOOM) pix = page.get_pixmap(matrix=mat, alpha=False) mode = "RGB" if pix.n < 4 else "RGBA" img = Image.frombytes(mode, [pix.width, pix.height], pix.samples) if mode == "RGBA": img = img.convert("RGB") w, h = img.size if max(w, h) > MAX_IMAGE_SIDE: s = MAX_IMAGE_SIDE / max(w, h) img = img.resize((int(w * s), int(h * s)), Image.LANCZOS) t = ocr_image(img) d = parse_inspection_date(t) if d: return d finally: doc.close() return None t = ocr_file(path) return parse_inspection_date(t) def parse_excel_inspection_date(val) -> Optional[date]: if val is None or (isinstance(val, float) and pd.isna(val)): return None ts = pd.to_datetime(val, errors="coerce") if pd.isna(ts): return None return ts.date() def next_inspection_for_plate(paths: List[str]) -> Optional[date]: ordered = file_try_order(paths) for p in ordered[:4]: d = ocr_inspection_from_path(p) if d: return d return None def main() -> None: if not os.path.isfile(TESSERACT): raise SystemExit(f"未找到 tesseract: {TESSERACT}") today = date.today() plate_files = build_plate_files() df = pd.read_excel(EXCEL_IN, sheet_name="车辆信息", header=0, engine="openpyxl") plates = [str(x).strip() if pd.notna(x) else "" for x in df["车牌号"]] excel_inspection = ( df[EXCEL_INSPECTION_COL] if EXCEL_INSPECTION_COL in df.columns else pd.Series([pd.NA] * len(df)) ) tasks: List[Tuple[int, str, List[str]]] = [] for idx, plate in enumerate(plates): if plate and plate_files.get(plate): tasks.append((idx, plate, plate_files[plate])) results: Dict[int, Optional[date]] = {} with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: fut_to_idx = { ex.submit(next_inspection_for_plate, fl): idx for idx, _p, fl in tasks } for fut in as_completed(fut_to_idx): idx = fut_to_idx[fut] try: results[idx] = fut.result() except Exception: results[idx] = None out_rows = [] for idx, plate in enumerate(plates): if not plate: out_rows.append( { "车牌号": plate, "行驶证照片是否存在": "否", "行驶证下次检验日期": "", "行驶证是否过期": "", } ) continue files = plate_files.get(plate, []) if not files: out_rows.append( { "车牌号": plate, "行驶证照片是否存在": "否", "行驶证下次检验日期": "", "行驶证是否过期": "", } ) continue ocr_d = results.get(idx) excel_d = parse_excel_inspection_date(excel_inspection.iloc[idx]) final_d = ocr_d or excel_d if final_d: date_str = final_d.strftime("%Y-%m-%d") expired = "是" if final_d < today else "否" else: date_str = "" expired = "" out_rows.append( { "车牌号": plate, "行驶证照片是否存在": "是", "行驶证下次检验日期": date_str, "行驶证是否过期": expired, } ) out_df = pd.DataFrame(out_rows) os.makedirs(os.path.dirname(EXCEL_OUT), exist_ok=True) with pd.ExcelWriter(EXCEL_OUT, engine="openpyxl") as w: out_df.to_excel(w, index=False, sheet_name="行驶证OCR") has_date = out_df["行驶证下次检验日期"] != "" with_photo = out_df["行驶证照片是否存在"] == "是" ocr_ok = sum(1 for idx, _, _ in tasks if results.get(idx)) excel_only = 0 for idx, plate in enumerate(plates): if not plate or not plate_files.get(plate): continue if ( results.get(idx) is None and parse_excel_inspection_date(excel_inspection.iloc[idx]) is not None ): excel_only += 1 print( f"完成: {EXCEL_OUT}\n" f"今日日期: {today.isoformat()}\n" f"有照片: {int(with_photo.sum())}\n" f"OCR 识别到检验日期: {ocr_ok}\n" f"OCR 未识别、第三列由表内「{EXCEL_INSPECTION_COL}」补全: {excel_only}\n" f"有检验日期(第三列非空): {int(has_date.sum())}" ) if __name__ == "__main__": main()