Commit 5eee534d by 宋毅

fanhui_change

parent fdea6c67
......@@ -8,20 +8,13 @@
#可利用SQLite3访问数据库,或先访问数据库获取
import os, sys,sqlite3
from flask import Flask
from flask_cors import *
from jinja2 import Template
server = Flask(__name__)
interface_path = os.path.dirname(__file__)
sys.path.insert(0, interface_path) # 将当前文件的父目录加入临时系统变量
server.config['JSON_AS_ASCII'] = False
CORS(server, supports_credentials=True) # 跨域请求
#连接数据库,读取描述信息,建立原始大字典
conn = sqlite3.connect('tax_online.db',check_same_thread=False)
# 数据库获取原始大字典
def GetAllDictFromSql():
# 连接数据库,读取描述信息,建立原始大字典
conn = sqlite3.connect('tax_online.db', check_same_thread=False)
cursor = conn.cursor()
dict_all_v1 = {} # 数据库获取原始大字典
try:
......@@ -93,206 +86,219 @@ def GetAllDictFromSql():
cursor.close()
conn.close()
return dict_all_v1, tax_desc
dict_all_v1, tax_desc = GetAllDictFromSql()
# 获取传入的参数,建立传参后的大字典
def CreatBdictFromJson(dict_all,GetAllinfo):
# 获取传入的参数
GetAllinfo_new=GetAllinfo["ri"]
if GetAllinfo["ri"] and type(GetAllinfo["ri"])=="str":
print("111111111",type(GetAllinfo["ri"]),GetAllinfo["ri"])
GetAllinfo_new = eval(GetAllinfo_new) # 如果key不存在,会返回None,因此需进行判断
print(GetAllinfo_new)
for factors_name, value in dict_all.items():
try:
status_get=GetAllinfo_new[factors_name]
# print(type(status_get),status_get)
if type(status_get)==dict:
dict_all[factors_name]["status"] = status_get
# print("____",dict_all)
if type(status_get) == str:
status_get=eval(status_get)
dict_all[factors_name]["status"] = status_get
except:
pass
print("!!!!!!!",dict_all)
class getriskinfo:
def __init__(self):
self.dict_all_v1, self.tax_desc = GetAllDictFromSql()
# 计算总体风险异常比例、以及被监控风险比例和被稽查风险比例
qualified = [] # 合格
disqualified = [] # 不合格
for k, v in dict_all.items():
s_status = v['status']["status_key"]
s_score = v['s_score']
if s_status == 1:
qualified.append(s_score)
if s_status == 2:
disqualified.append(s_score)
A_count = sum(qualified) + sum(disqualified) # 计算所有合格与不合格对应指标权重之和(分母)
print(A_count)
B_count = sum(disqualified) # 计算所有不合格对应指标权重之和(分子)
print(B_count)
r1 = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
print(r1, type(r1))
t_r1 = str(int(r1 * 100)) + "%"
t_r2 = str(int((r1 - 0.29) * 100)) + "%"
t_r3 = str(int((r1 - 0.49) * 100)) + "%"
standard1 = 0.3
standard2 = 0.5
# 建立第一层规则
rules = {r1 <= standard1: {"t_r1": t_r1, "t_r2": "1%", "t_r3": "1%"},
r1 > standard1 and r1 <= standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": "1%"},
r1 > standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": t_r3}}
ALLRisk = {}
for k1, v1 in rules.items():
if k1:
ALLRisk = v1
# print("+++++++++++++",ALLRisk)
return dict_all, ALLRisk
# 获取传入的参数,建立传参后的大字典
def CreatBdictFromJson(self,dict_all,GetAllinfo):
# 获取传入的参数
GetAllinfo_new=GetAllinfo["ri"]
if GetAllinfo["ri"] and type(GetAllinfo["ri"])=="str":
print("111111111",type(GetAllinfo["ri"]),GetAllinfo["ri"])
GetAllinfo_new = eval(GetAllinfo_new) # 如果key不存在,会返回None,因此需进行判断
print("===================================================获取ri部分====\n",GetAllinfo_new)
for factors_name, value in dict_all.items():
try:
status_get=GetAllinfo_new[factors_name]
# print(type(status_get),status_get)
if type(status_get)==dict:
dict_all[factors_name]["status"] = status_get
# print("____",dict_all)
if type(status_get) == str:
# print("%%%",status_get,eval(status_get))
status_get=eval(status_get)
dict_all[factors_name]["status"] = status_get
# print("修改后的dict_all",dict_all)
except:
pass
# 计算总体风险异常比例、以及被监控风险比例和被稽查风险比例
qualified = [] # 合格
disqualified = [] # 不合格
for k, v in dict_all.items():
s_status = v['status']["status_key"]
s_score = v['s_score']
if s_status == 1:
qualified.append(s_score)
if s_status == 2:
disqualified.append(s_score)
A_count = sum(qualified) + sum(disqualified) # 计算所有合格与不合格对应指标权重之和(分母)
print(A_count)
B_count = sum(disqualified) # 计算所有不合格对应指标权重之和(分子)
print(B_count)
ALLRisk = {}
if A_count!=0:
r1 = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
print(r1, type(r1))
t_r1 = str(int(r1 * 100)) + "%"
t_r2 = str(int((r1 - 0.29) * 100)) + "%"
t_r3 = str(int((r1 - 0.49) * 100)) + "%"
standard1 = 0.3
standard2 = 0.5
# 建立第一层规则
rules = {r1 <= standard1: {"t_r1": t_r1, "t_r2": "1%", "t_r3": "1%"},
r1 > standard1 and r1 <= standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": "1%"},
r1 > standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": t_r3}}
for k1, v1 in rules.items():
if k1:
ALLRisk = v1
# print("+++++++++++++",ALLRisk)
return dict_all, ALLRisk
# 输入计算出的风险异常比例,返回提示结果
def RiskAbnormal(t_r1):
RiskA = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_1 = {"t_r1_s<='0.3'": "企业风险异常同比其他企业比例较低",
"t_r1_s>'0.3'": "企业风险异常同比其他企业比例明显偏高"
}
for k1, v1 in rules_1.items():
if eval(str(k1)):
RiskA = {"score":t_r1,"describe":v1}
return RiskA
# 输入计算出的被检测风险值,判断被监测风险结果
def MonitoredRisk(t_r2):
MRisk = {}
t_r2_s = str(int(t_r2.replace("%", "")) / 100)
rules_2 = {"t_r2_s<='0.1'": "企业被监控的风险同比其他企业比例较低",
"t_r2_s>'0.1' and t_r2_s<='0.5'": "经检测,企业有一定可能性被选为税务监控对象",
"t_r2_s>'0.5'": "经检测,企业有很大可能性被选为税务监控对象"
}
for k2, v2 in rules_2.items():
if eval(str(k2)):
MRisk = {"score": t_r2, "describe": v2}
return MRisk
# 输入计算出的风险异常比例,返回提示结果
def RiskAbnormal(self,t_r1):
RiskA = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_1 = {"t_r1_s<='0.3'": "企业风险异常同比其他企业比例较低",
"t_r1_s>'0.3'": "企业风险异常同比其他企业比例明显偏高"
}
for k1, v1 in rules_1.items():
if eval(str(k1)):
RiskA = {"score":t_r1,"describe":v1}
return RiskA
# 输入计算出的被稽查风险值,判断被稽查风险结果
def AuditedRisk(t_r3):
ARisk = {}
t_r3_s = str(int(t_r3.replace("%", "")) / 100)
rules_3 = {"t_r3_s<='0.1'": "企业被税务稽查的可能性较低",
"t_r3_s>'0.1' and t_r3_s<='0.3'": "企业有一定可能性被税务稽查",
"t_r3_s>'0.3'": "企业有很大可能性被税务稽查"
}
for k3, v3 in rules_3.items():
if eval(str(k3)):
ARisk = {"score":t_r3,"describe":v3}
return ARisk
# 输入计算出的被检测风险值,判断被监测风险结果
def MonitoredRisk(self,t_r2):
MRisk = {}
t_r2_s = str(int(t_r2.replace("%", "")) / 100)
rules_2 = {"t_r2_s<='0.1'": "企业被监控的风险同比其他企业比例较低",
"t_r2_s>'0.1' and t_r2_s<='0.5'": "经检测,企业有一定可能性被选为税务监控对象",
"t_r2_s>'0.5'": "经检测,企业有很大可能性被选为税务监控对象"
}
for k2, v2 in rules_2.items():
if eval(str(k2)):
MRisk = {"score": t_r2, "describe": v2}
return MRisk
# 输入总体计算的风险异常比例,得到提示语(也可与RiskAbnormal函数合并)
def information(t_r1):
info = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_0 = {"t_r1_s<='0.3'": "贵企业的风险异常情况同比其他企业较少,如需更多更详细的诊断服务,请联系我们>>",
"t_r1_s>'0.3' and t_r1_s<='0.6'": "企业的风险异常情况同比其他企业明显偏多,有较大可能性成为税务系统监控的对象,请企业的财税人员参考以下详细检测情况进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.6' and t_r1_s<='0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有一定可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有很大可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>"
}
for k0, v0 in rules_0.items():
if eval(str(k0)):
info = {"score":t_r1,"describe":v0}
return info
# 输入计算出的被稽查风险值,判断被稽查风险结果
def AuditedRisk(self,t_r3):
ARisk = {}
t_r3_s = str(int(t_r3.replace("%", "")) / 100)
rules_3 = {"t_r3_s<='0.1'": "企业被税务稽查的可能性较低",
"t_r3_s>'0.1' and t_r3_s<='0.3'": "企业有一定可能性被税务稽查",
"t_r3_s>'0.3'": "企业有很大可能性被税务稽查"
}
for k3, v3 in rules_3.items():
if eval(str(k3)):
ARisk = {"score":t_r3,"describe":v3}
return ARisk
# 输入总体计算的风险异常比例,得到提示语(也可与RiskAbnormal函数合并)
def information(self,t_r1):
info = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_0 = {"t_r1_s<='0.3'": "贵企业的风险异常情况同比其他企业较少,如需更多更详细的诊断服务,请联系我们>>",
"t_r1_s>'0.3' and t_r1_s<='0.6'": "企业的风险异常情况同比其他企业明显偏多,有较大可能性成为税务系统监控的对象,请企业的财税人员参考以下详细检测情况进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.6' and t_r1_s<='0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有一定可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有很大可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>"
}
for k0, v0 in rules_0.items():
if eval(str(k0)):
info = {"score":t_r1,"describe":v0}
return info
#获取描述信息的方法
def GetResult(risk_factor_dict,status_value):
desc_info=risk_factor_dict["factors_describe"]
if status_value=="big" or status_value=="small":
try:
dict_new=eval(desc_info)
tempstr='''{{dict[key]}}'''
template = Template(tempstr)
factor_desc=template.render(dict=dict_new,key=status_value)
print(factor_desc)
except:
factor_desc="没有获取到风险描述信息"
else:
factor_desc=desc_info
return factor_desc
#通用函数,输入不同风险监测项列表list,得到指标对应描述信息list
def GetRiskDesc(dict_all,risklist):
new_dict={}# 建立风险检测项字典
for f1 in risklist:
new_dict[f1] = dict_all[f1]
# 判断所有指标是否为缺失
Risk_not_null = {}
for k1, v1 in new_dict.items():
s_status1 = v1["status"]["status_key"]
if s_status1 > 0:
Risk_not_null[k1] = new_dict[k1]
p1_describe = []
if Risk_not_null:
for k_part1, v_part1 in Risk_not_null.items():
s_status1 = v_part1["status"]["status_key"]
s_status2 = v_part1["status"]["status_value"]
if s_status1 == 2:
s_str = GetResult(v_part1, s_status2)
p1_describe.append(s_str)
else:
p1_describe = ["贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"]
return p1_describe
#获取描述信息的方法
def GetResult(self,risk_factor_dict,status_value):
desc_info=risk_factor_dict["factors_describe"]
if status_value=="big" or status_value=="small":
try:
dict_new=eval(desc_info)
tempstr='''{{dict[key]}}'''
template = Template(tempstr)
factor_desc=template.render(dict=dict_new,key=status_value)
print(factor_desc)
except:
factor_desc="没有获取到风险描述信息"
else:
factor_desc=desc_info
return factor_desc
#通用函数,输入不同风险监测项列表list,得到指标对应描述信息list
def GetRiskDesc(self,dict_all,risklist):
new_dict={}# 建立风险检测项字典
for f1 in risklist:
new_dict[f1] = dict_all[f1]
# 判断所有指标是否为缺失
Risk_not_null = {}
for k1, v1 in new_dict.items():
s_status1 = v1["status"]["status_key"]
if s_status1 > 0:
Risk_not_null[k1] = new_dict[k1]
p1_describe = []
if Risk_not_null:
for k_part1, v_part1 in Risk_not_null.items():
s_status1 = v_part1["status"]["status_key"]
s_status2 = v_part1["status"]["status_value"]
if s_status1 == 2:
s_str = self.GetResult(v_part1, s_status2)
p1_describe.append(s_str)
else:
p1_describe = ["贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"]
return p1_describe
#整合调用前面构建函数,并结合传入数据建立风险监测结果函数
def Riskinfo(GetAllinfo):
dict_all, ALLRisk = CreatBdictFromJson(dict_all_v1, GetAllinfo)
RiskA = RiskAbnormal(ALLRisk["t_r1"]) # 风险异常比例
MRisk = MonitoredRisk(ALLRisk["t_r2"]) # 被监控风险
ARisk = AuditedRisk(ALLRisk["t_r3"]) # 被稽查风险
info = information(ALLRisk["t_r1"]) # 提示语
# 计算所得税隐藏收入风险异常
IT_YCSR_list = tax_desc.get("IT_YCSR") # 获取所得税隐藏收入指标名称
Risk_IT_YCSR=GetRiskDesc(dict_all,IT_YCSR_list) # 企业所得税隐藏收入风险信息
# 计算所得税虚增费用风险异常
IT_XZFY_list = tax_desc.get("IT_XZFY") # 获取所得税虚增费用指标名称
Risk_IT_XZFY=GetRiskDesc(dict_all,IT_XZFY_list) # 企业所得税隐藏收入风险信息
# 计算所得税虚增成本风险异常
IT_XZCB_list = tax_desc.get("IT_XZCB") # 获取所得税虚增成本指标名称
Risk_IT_XZCB = GetRiskDesc(dict_all,IT_XZCB_list)# 企业所得税虚增成本风险信息
# 计算增值税虚开发票风险异常
VAT_XKFP_list = tax_desc.get("VAT_XKFP") # 获取增值税虚开发票指标名称
Risk_VAT_XKFP = GetRiskDesc(dict_all,VAT_XKFP_list) # 企业增值税虚开发票风险信息
# 计算增值税虚增进项风险异常
VAT_XZJX_list = tax_desc.get("VAT_XZJX") # 获取增值税虚增进项指标名称
Risk_VAT_XZJX = GetRiskDesc(dict_all,VAT_XZJX_list)# 企业增值税虚增进项风险信息
# 计算增值税隐藏销项风险异常
VAT_YCXX_list = tax_desc.get("VAT_YCXX") # 获取增值税隐藏销项指标名称
Risk_VAT_YCXX = GetRiskDesc(dict_all,VAT_YCXX_list)# 企业增值税隐藏销项风险信息
# 计算消费税风险异常
CT_FX_list = tax_desc.get("CT_FX") # 获取消费税指标名称
Risk_CT_FX = GetRiskDesc(dict_all,CT_FX_list) # 消费税风险信息
# 综合
ALLT_FX_list = tax_desc.get("ALLT_FX") # 获取综合风险指标名称
Risk_ALLT_FX =GetRiskDesc(dict_all,ALLT_FX_list) # 综合风险信息
RI_result = {
"RiskA": RiskA,
"MRisk": MRisk,
"ARisk": ARisk,
"info": info,
"Risk_IT_YCSR": Risk_IT_YCSR,
"Risk_IT_XZFY": Risk_IT_XZFY,
"Risk_IT_XZCB": Risk_IT_XZCB,
"Risk_VAT_XKFP": Risk_VAT_XKFP,
"Risk_VAT_XZJX": Risk_VAT_XZJX,
"Risk_VAT_YCXX": Risk_VAT_YCXX,
"Risk_CT_FX": Risk_CT_FX,
"Risk_ALLT_FX": Risk_ALLT_FX
#整合调用前面构建函数,并结合传入数据建立风险监测结果函数
def Riskinfo(self,GetAllinfo):
dict_all_new, ALLRisk = self.CreatBdictFromJson(self.dict_all_v1, GetAllinfo)
print("&&&&生成新的大字典\n",ALLRisk,dict_all_new)
if ALLRisk!={}:
RiskA = self.RiskAbnormal(ALLRisk["t_r1"]) # 风险异常比例
MRisk = self.MonitoredRisk(ALLRisk["t_r2"]) # 被监控风险
ARisk = self.AuditedRisk(ALLRisk["t_r3"]) # 被稽查风险
info = self.information(ALLRisk["t_r1"]) # 提示语
else:
RiskA ={"score":"0%","describe":"贵公司提供的风险检测数据缺失,请补充完整后进行风险异常测试"} # 风险异常比例
MRisk ={"score":"0%","describe":"贵公司提供的风险检测数据缺失,请补充完整后进行被监控风险测试"} # 被监控风险
ARisk ={"score":"0%","describe":"贵公司提供的风险检测数据缺失,请补充完整后进行被稽查风险测试"} # 被稽查风险
info ={"score":"0%","describe":"贵公司提供的风险检测数据缺失,无法进行相关风险检测,请财税人员尽快补充完整后进行测试,如需进行更详细的诊断服务请联系我们"} # 提示语
# 计算所得税隐藏收入风险异常
IT_YCSR_list = self.tax_desc.get("IT_YCSR") # 获取所得税隐藏收入指标名称
Risk_IT_YCSR=self.GetRiskDesc(dict_all_new,IT_YCSR_list) # 企业所得税隐藏收入风险信息
print("#企业所得税隐藏收入风险",Risk_IT_YCSR)
# 计算所得税虚增费用风险异常
IT_XZFY_list = self.tax_desc.get("IT_XZFY") # 获取所得税虚增费用指标名称
Risk_IT_XZFY=self.GetRiskDesc(dict_all_new,IT_XZFY_list) # 企业所得税隐藏收入风险信息
# 计算所得税虚增成本风险异常
IT_XZCB_list = self.tax_desc.get("IT_XZCB") # 获取所得税虚增成本指标名称
Risk_IT_XZCB = self.GetRiskDesc(dict_all_new,IT_XZCB_list)# 企业所得税虚增成本风险信息
# 计算增值税虚开发票风险异常
VAT_XKFP_list = self.tax_desc.get("VAT_XKFP") # 获取增值税虚开发票指标名称
Risk_VAT_XKFP = self.GetRiskDesc(dict_all_new,VAT_XKFP_list) # 企业增值税虚开发票风险信息
# 计算增值税虚增进项风险异常
VAT_XZJX_list = self.tax_desc.get("VAT_XZJX") # 获取增值税虚增进项指标名称
Risk_VAT_XZJX = self.GetRiskDesc(dict_all_new,VAT_XZJX_list)# 企业增值税虚增进项风险信息
# 计算增值税隐藏销项风险异常
VAT_YCXX_list = self.tax_desc.get("VAT_YCXX") # 获取增值税隐藏销项指标名称
Risk_VAT_YCXX = self.GetRiskDesc(dict_all_new,VAT_YCXX_list)# 企业增值税隐藏销项风险信息
# 计算消费税风险异常
CT_FX_list = self.tax_desc.get("CT_FX") # 获取消费税指标名称
Risk_CT_FX = self.GetRiskDesc(dict_all_new,CT_FX_list) # 消费税风险信息
# 综合
ALLT_FX_list = self.tax_desc.get("ALLT_FX") # 获取综合风险指标名称
Risk_ALLT_FX =self.GetRiskDesc(dict_all_new,ALLT_FX_list) # 综合风险信息
}
print(RI_result, "------------------------------")
return RI_result
RI_result = {
"RiskA": RiskA,
"MRisk": MRisk,
"ARisk": ARisk,
"info": info,
"Risk_IT_YCSR": Risk_IT_YCSR,
"Risk_IT_XZFY": Risk_IT_XZFY,
"Risk_IT_XZCB": Risk_IT_XZCB,
"Risk_VAT_XKFP": Risk_VAT_XKFP,
"Risk_VAT_XZJX": Risk_VAT_XZJX,
"Risk_VAT_YCXX": Risk_VAT_YCXX,
"Risk_CT_FX": Risk_CT_FX,
"Risk_ALLT_FX": Risk_ALLT_FX
}
# print(RI_result, "------------------------------")
return RI_result
......
......@@ -7,7 +7,7 @@
from getcompanyinfofromES import GetComanyinfoFromES
from GetFIinfoFromJson import GetFIinfoFromJson
from GetRiskinfo import Riskinfo
from GetRiskinfo import getriskinfo
import time,requests,oss2,sys,os,json
from flask import Flask ,request
from docxtpl import DocxTemplate,InlineImage
......@@ -148,9 +148,6 @@ def WriteReport(Else_info,Companyinfo,Fi_dict,Risk_info):
context['guage1'] = InlineImage(tpl, guage1, width=Mm(60))
context['guage2'] = InlineImage(tpl, guage2, width=Mm(60))
context['guage3'] = InlineImage(tpl, guage3, width=Mm(60))
context['RiskA_desc'] ="企业风险异常同比其他企业比例较低"
context['MRisk_desc'] = "企业风险异常同比其他企业比例较低"
context['ARisk_desc'] = "企业风险异常同比其他企业比例较低"
# 风险信息
context["RiskA_desc"] = Risk_info["RiskA"]["describe"]
context["MRisk_desc"] = Risk_info["MRisk"]["describe"]
......@@ -223,7 +220,7 @@ def report():
data = request.json
# print(data)
# print("ccccccccccccccccccccccccccccc")
# json_str = json.dumps(data)
# json_str = json.dumps(data, ensure_ascii=False)
# print(json_str)
GetAllinfo = json.loads(data)
companyname=GetAllinfo["company_name"] #获取监测企业名称
......@@ -232,7 +229,8 @@ def report():
if companyname:
Companyinfo=GetComanyinfoFromES(companyname) #根据输入公司名获取工商信息
Else_info, Fi_dict=GetFIinfoFromJson(GetAllinfo)#获取输入其他信息和财务信息
Risk_info=Riskinfo(GetAllinfo)#计算风险项
riskinfo = getriskinfo()
Risk_info=riskinfo.Riskinfo(GetAllinfo)#计算风险项
report=None
try:
report = WriteReport(Else_info,Companyinfo,Fi_dict,Risk_info) #将工商、财务、风险三部分内容写入word中
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment