Files
mileage-bonus/calc_engine.py
kkfluous da487c41d4 V3.2.0 结转不依赖考核记录 + 补发查对应月盈亏
1. 结转不依赖考核记录:
   - 1月有结转但2月/3月无考核记录 → 创建虚拟group(目标=满月,实际=0)
   - 虚拟记录正常发放结转奖金
   - 无客户关联 → 不查盈亏,正常发放

2. 补发查对应月盈亏:
   - 补发1月 → 查1月亏损表
   - 补发2月 → 查2月亏损表
   - 当月/结转/累计补发 → 查当月亏损表
   - 奖金发放记录新增"盈亏查询月"列

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 13:30:42 +08:00

275 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""计算引擎:规则、读取、分组、结转/补发/累计逻辑"""
import openpyxl
from collections import defaultdict
RULES = {
'交投40辆4.5T普货': {'km': 3000, '': 150},
'交投190辆4.5T冷链车': {'km': 3000, '': 150},
'羚牛136辆4.5T冷链车': {'km': 5000, '': 260},
'恒运50辆4.5T普货': {'km': 5000, '': 260},
'恒运50辆4.5T 普货': {'km': 5000, '': 260},
'羚牛100辆18T': {'km': 6000, '': 1000},
}
DAYS = {1: 31, 2: 28, 3: 31}
def read_file(fp, month):
wb = openpyxl.load_workbook(fp, data_only=True)
ws = wb['业务考核视图']
h = [c.value for c in next(ws.iter_rows(min_row=1, max_row=1))]
recs = []
for row in ws.iter_rows(min_row=2, values_only=True):
r = dict(zip(h, row))
r['考核天数'] = float(r.get('考核天数') or 0)
r['应考核里程(km)'] = float(r.get('应考核里程(km)') or 0)
r['实际行驶里程(km)'] = float(r.get('实际行驶里程(km)') or 0)
r['月份'] = month
rule = RULES.get(r.get('考核目标', ''), {})
r['月度目标里程'] = rule.get('km', 0)
r['对应月奖励金额'] = rule.get('', 0)
t, a = r['应考核里程(km)'], r['实际行驶里程(km)']
q = a >= t and t > 0
r['是否达标'] = '达标' if q else '未达标'
r['奖金'] = r['对应月奖励金额'] * (r['考核天数'] / DAYS[month]) if q else 0
r['多跑'] = max(0, a - t) if q else 0
r['可结转'] = int(r['多跑'] // r['月度目标里程']) if r['月度目标里程'] > 0 and q else 0
recs.append(r)
wb.close()
return recs
def grp(recs, month):
gs = defaultdict(lambda: {'recs':[], '应考核':0, '实际':0, '奖金':0, '天数':0,
'有达标':False, '目标km':0, '奖励额':0})
for r in recs:
k = (r['车牌号'], r['销售经理'])
g = gs[k]
g['recs'].append(r)
g['应考核'] += r['应考核里程(km)']
g['实际'] += r['实际行驶里程(km)']
g['奖金'] += r['奖金']
g['天数'] += r['考核天数']
if r['是否达标'] == '达标': g['有达标'] = True
g['目标km'] = max(g['目标km'], r['月度目标里程'])
g['奖励额'] = max(g['奖励额'], r['对应月奖励金额'])
g['部门'] = r['部门名称']
g['销售'] = r['销售经理']
g['车牌'] = r['车牌号']
return dict(gs)
def calc_jan_carryover(G1):
for k, g in G1.items():
tc = 0
for r in g['recs']:
if r['是否达标'] == '达标': tc += r['可结转']
g['可结转'] = tc
def calc_feb(G1, G2):
feb_data = {'结转':[], '补发1月':[], '当月':[], '累计补发2月':[]}
for k, g2 in G2.items():
g1 = G1.get(k)
bf = g2['奖励额']
j_t = g1['应考核'] if g1 else 0; j_a = g1['实际'] if g1 else 0
j_q = g1['有达标'] if g1 else False; j_c = g1['可结转'] if g1 else 0
cum_t = j_t + g2['应考核']; cum_a = j_a + g2['实际']
cum_q = cum_a >= cum_t and cum_t > 0
carry = 0
if g1 and j_q and j_c >= 1:
carry = bf
feb_data['结转'].append({'车牌':k[0],'销售':g1['销售'],'部门':g2['部门'],'':carry})
bp1 = 0
if g1 and not j_q and cum_q:
bp1 = g1['奖励额'] * (g1['天数'] / 31)
feb_data['补发1月'].append({'车牌':k[0],'销售':g1['销售'],'部门':g2['部门'],'':bp1})
bonus2 = 0
if g2['有达标'] and carry == 0:
bonus2 = g2['奖金']
feb_data['当月'].append({'车牌':k[0],'销售':g2['销售'],'部门':g2['部门'],'':bonus2})
cbp2 = 0
if not g2['有达标'] and carry == 0 and cum_q:
cbp2 = g2['奖励额'] * (g2['天数'] / 28)
feb_data['累计补发2月'].append({'车牌':k[0],'销售':g2['销售'],'部门':g2['部门'],'':cbp2})
g2['cum_t']=cum_t; g2['cum_a']=cum_a; g2['cum_q']=cum_q
g2['结转']=carry; g2['补发1月']=bp1
g2['补发1月对应']=g1['销售'] if g1 and bp1>0 else ''
g2['当月奖金']=bonus2; g2['累计补发2月']=cbp2; g2['结转占位']=carry>0
fc = sum(r['可结转'] for r in g2['recs'] if r['是否达标']=='达标')
jr = max(0, j_c - (1 if carry>0 else 0))
g2['可结转'] = fc + jr
g2['2月已发'] = carry>0 or bonus2>0 or cbp2>0
g2['1月已补发'] = bp1>0
# 补充1月有结转但2月无考核记录的车 → 创建虚拟2月group并发放结转
for k, g1 in G1.items():
if k not in G2 and g1['可结转'] >= 1:
bf = g1['奖励额']
carry = bf
feb_data['结转'].append({'车牌':k[0],'销售':g1['销售'],'部门':g1['部门'],'':carry})
# 创建虚拟2月group考核里程=满月目标,实际=0
G2[k] = {
'recs':[], '应考核':g1['目标km'], '实际':0, '奖金':0, '天数':DAYS[2],
'有达标':False, '目标km':g1['目标km'], '奖励额':g1['奖励额'],
'部门':g1['部门'], '销售':g1['销售'], '车牌':k[0],
'cum_t':g1['应考核']+g1['目标km'], 'cum_a':g1['实际'],
'cum_q': g1['实际'] >= g1['应考核']+g1['目标km'],
'结转':carry, '补发1月':0, '补发1月对应':'',
'当月奖金':0, '累计补发2月':0, '结转占位':True,
'可结转': max(0, g1['可结转'] - 1),
'2月已发':True, '1月已补发':False,
'虚拟':True, # 标记为无考核记录
}
return feb_data
def calc_mar(G1, G2, G3, feb_data):
mar_data = {'结转':[], '补发1月':[], '补发2月':[], '当月':[], '累计补发3月':[]}
for k, g3 in G3.items():
g1 = G1.get(k); g2 = G2.get(k)
bf = g3['奖励额']
j_t=g1['应考核'] if g1 else 0; j_a=g1['实际'] if g1 else 0
j_q=g1['有达标'] if g1 else False; j_paid=(g1['奖金']>0) if g1 else False
f_t=g2['应考核'] if g2 else 0; f_a=g2['实际'] if g2 else 0
f_paid=g2['2月已发'] if g2 else False; f_carry=g2['可结转'] if g2 else 0
if g1 and j_q: j_paid=True
if g2 and g2.get('1月已补发',False): j_paid=True
cum_t=j_t+f_t+g3['应考核']; cum_a=j_a+f_a+g3['实际']
cum_q=cum_a>=cum_t and cum_t>0
carry=0
if g2 and f_carry>=1:
carry=bf; mar_data['结转'].append({'车牌':k[0],'销售':g2['销售'],'部门':g3['部门'],'':carry})
bj=0
if g1 and not j_paid and cum_q:
bj=g1['奖励额']*(g1['天数']/31); mar_data['补发1月'].append({'车牌':k[0],'销售':g1['销售'],'部门':g3['部门'],'':bj})
bf2=0
if g2 and not f_paid and cum_q:
bf2=g2['奖励额']*(g2['天数']/28); mar_data['补发2月'].append({'车牌':k[0],'销售':g2['销售'],'部门':g3['部门'],'':bf2})
bonus3=0
if g3['有达标'] and carry==0:
bonus3=g3['奖金']; mar_data['当月'].append({'车牌':k[0],'销售':g3['销售'],'部门':g3['部门'],'':bonus3})
cbp3=0
if not g3['有达标'] and carry==0 and cum_q:
cbp3=g3['奖励额']*(g3['天数']/31); mar_data['累计补发3月'].append({'车牌':k[0],'销售':g3['销售'],'部门':g3['部门'],'':cbp3})
g3['cum_t']=cum_t; g3['cum_a']=cum_a; g3['cum_q']=cum_q
g3['结转']=carry; g3['补发1月']=bj; g3['补发1月对应']=g1['销售'] if g1 and bj>0 else ''
g3['补发2月']=bf2; g3['补发2月对应']=g2['销售'] if g2 and bf2>0 else ''
g3['当月奖金']=bonus3; g3['累计补发3月']=cbp3; g3['结转占位']=carry>0
# 补充2月有结转但3月无考核记录的车
for k, g2 in G2.items():
if k not in G3 and g2.get('可结转', 0) >= 1:
bf = g2['奖励额']
carry = bf
mar_data['结转'].append({'车牌':k[0],'销售':g2['销售'],'部门':g2['部门'],'':carry})
G3[k] = {
'recs':[], '应考核':g2['目标km'], '实际':0, '奖金':0, '天数':DAYS[3],
'有达标':False, '目标km':g2['目标km'], '奖励额':g2['奖励额'],
'部门':g2['部门'], '销售':g2['销售'], '车牌':k[0],
'cum_t':(G1.get(k,{}).get('应考核',0))+g2['应考核']+g2['目标km'],
'cum_a':(G1.get(k,{}).get('实际',0))+g2['实际'],
'cum_q':False,
'结转':carry, '补发1月':0, '补发1月对应':'',
'补发2月':0, '补发2月对应':'',
'当月奖金':0, '累计补发3月':0, '结转占位':True,
'虚拟':True,
}
return mar_data
def collect_vehicle_payments(G, feb_data, mar_data):
payments = defaultdict(list)
info = {}
for k, g in G[1].items():
plate = k[0]
if not info.get(plate):
first = g['recs'][0]
info[plate] = {'考核目标':first.get('考核目标',''),'月度奖励':g['奖励额'],'目标km':g['目标km']}
if g['奖金'] > 0:
payments[plate].append({'结算月':1,'对应考核月':1,'业务员':g['销售'],'金额':g['奖金'],'类型':'当月达标','部门':g['部门']})
cat_map = {
('结转',2):2, ('补发1月',2):1, ('当月',2):2, ('累计补发2月',2):2,
('结转',3):3, ('补发1月',3):1, ('补发2月',3):2, ('当月',3):3, ('累计补发3月',3):3,
}
for month, mdata in [(2, feb_data), (3, mar_data)]:
for cat, dl in mdata.items():
assess_month = cat_map.get((cat, month), month)
for d in dl:
payments[d['车牌']].append({'结算月':month,'对应考核月':assess_month,
'业务员':d['销售'],'金额':d[''],'类型':cat,'部门':d['部门']})
for m in G:
for k, g in G[m].items():
if not info.get(k[0]):
first = g['recs'][0]
info[k[0]] = {'考核目标':first.get('考核目标',''),'月度奖励':g['奖励额'],'目标km':g['目标km']}
for plate in payments:
records = sorted(payments[plate], key=lambda x: (x['对应考核月'], x['结算月']))
for i, r in enumerate(records): r['期数'] = i + 1
return payments, info
# (归属公司+车型) → 考核目标 映射,用于补全无考核记录的车辆
VEHICLE_TARGET_MAP = {
('广州开发区交投氢能运营管理有限公司', '4.5吨冷链车'): ('交投190辆4.5T冷链车', 3000, 150),
('广州开发区交投氢能运营管理有限公司', '4.5吨货车'): ('交投40辆4.5T普货', 3000, 150),
('羚牛氢能科技(广东)有限公司', '4.5吨冷链车'): ('羚牛136辆4.5T冷链车', 5000, 260),
('羚牛氢能科技(广东)有限公司', '18吨双飞翼货车'): ('羚牛100辆18T', 6000, 1000),
('现代氢能科技(广州)有限公司', '4.5吨货车'): ('恒运50辆4.5T普货', 5000, 260),
('现代氢能科技(广州)有限公司', '4.5吨货车'): ('恒运50辆4.5T普货', 5000, 260),
}
def read_loss_data(month):
"""读取亏损表,返回 {客户名称: ''/''} 字典。无亏损表返回None"""
import os
fp = f'{month}月.xlsx'
if not os.path.exists(fp):
return None
wb = openpyxl.load_workbook(fp, data_only=True)
ws = wb[wb.sheetnames[0]]
h = [c.value for c in next(ws.iter_rows(min_row=1, max_row=1))]
# 1月: 列名"项目","1月是否亏损"
# 2月: 列名"客户名称","是否亏损"
client_col = None; loss_col = None
for i, col_name in enumerate(h):
if col_name and ('项目' in str(col_name) or '客户' in str(col_name)):
client_col = i
if col_name and '亏损' in str(col_name):
loss_col = i
if client_col is None or loss_col is None:
wb.close()
return None
result = {}
for row in ws.iter_rows(min_row=2, values_only=True):
client = row[client_col]
loss = row[loss_col]
if client:
result[str(client).strip()] = str(loss).strip() if loss else ''
wb.close()
return result
def get_vehicle_client_map(D):
"""从考核源数据构建 {车牌号: 客户名称} 映射(取最新月的客户名)"""
plate_client = {}
for m in sorted(D.keys()):
for r in D[m]:
plate_client[r['车牌号']] = r.get('客户名称', '')
return plate_client
def read_master_vehicles(fp='里程任务考核_Q1汇总.xlsx'):
"""从现有Q1汇总文件读取全量车辆台账"""
wb = openpyxl.load_workbook(fp, data_only=True)
ws = wb['车辆奖金池总览']
h = [c.value for c in next(ws.iter_rows(min_row=1, max_row=1))]
vehicles = []
for row in ws.iter_rows(min_row=2, values_only=True):
r = dict(zip(h, row))
if r.get('车牌号'):
vehicles.append({'车牌号': r['车牌号'], '车架号': r.get('车架号',''),
'归属公司': r.get('归属公司',''), '车型确定': r.get('车型','')})
wb.close()
return vehicles