Files
Esmart/lib.py
2025-09-28 15:51:09 +08:00

1201 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# !/usr/bin/env python
# -*- coding:utf-8 -*-
import base64
import json
import time
from abc import abstractmethod
import requests
class WXworkAuth:
def __init__(self, app_id, app_key):
self.crop_id = app_id
self.crop_secret = app_key
self.access_token = None
self.timestamp = None
def _get_access_token(self):
self.timestamp = int(time.time())
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.crop_id}&corpsecret={self.crop_secret}"
response = requests.get(url)
access_info = response.json()
if 'access_token' in access_info:
self.access_token = access_info['access_token']
else:
raise Exception('获取access_token失败')
def get_access_token(self):
_now = int(time.time())
if not self.access_token or (_now - self.timestamp) > 7200:
self.refresh_token()
return self.access_token
def refresh_token(self):
try:
self._get_access_token()
except Exception as e:
print(e, '重新获取')
self._get_access_token()
class CloudFunctionAPI:
def __init__(self, base_url):
self.base_url = base_url
def get_user_info_by_id(self, user_id: str) -> dict:
url = f"{self.base_url}/in/api/user"
body = {
"uid": user_id
}
response = requests.get(url, params=body)
if response.status_code != 200:
return {}
user_info = response.json().get('data', {})
return user_info
def get_user_info_by_sec(self, security_code: str) -> list:
url = f"{self.base_url}/in/api/userQuery"
body = {
"secCode": security_code,
"status": 10,
"PageSize": 10,
"PageNo": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return []
user_info = response.json().get("list")
if user_info is None:
user_info = [{}]
return user_info
def get_user_info_by_name(self, user_name: str) -> dict:
url = f"{self.base_url}/in/api/userQuery"
body = {
"loginName": user_name,
"status": 10,
"PageSize": 10,
"PageNo": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json().get("list")
if user_info is None:
user_info = {}
else:
user_info = user_info[0]
return user_info
@staticmethod
def parse_tags(tags: list):
def _parse_tag(_tag: str):
return _tag.split("_")
temp_dict = {}
for tag in tags:
tag_split = _parse_tag(tag)
size = len(tag_split)
cursor = None
for idx, i in enumerate(tag_split):
if idx == 0:
cursor = temp_dict
val = {} if idx < size - 1 else True
cursor[i] = val
if isinstance(val, bool):
cursor = None
else:
cursor = cursor[i]
print(temp_dict)
return temp_dict
def get_user_tags(self, user_id: str = "", user_name: str = "") -> dict:
if user_id:
user_info = self.get_user_info_by_id(user_id)
elif user_name:
user_info = self.get_user_info_by_name(user_name)
else:
return {}
tags = user_info.get('tags', [])
return self.parse_tags(tags)
def update_user_tag(self, user_id: str, key: str, value: str):
url = f"{self.base_url}/in/api/userTags"
body = {
"userId": user_id,
"tags": [f"{key}_{value}"]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json()
return user_info
def update_user_tag_by_name(self, user_name: str, key: str, value: str):
user_info = self.get_user_info_by_name(user_name)
user_id = user_info.get('id')
if not user_id:
return {}
return self.update_user_tag(user_id, key, value)
def update_user_tags(self, user_id: str, tags: dict):
url = f"{self.base_url}/in/api/userTags"
body = {
"userId": user_id,
"tags": [f"{key}_{value}" for key, value in tags.items()]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json()
return user_info
class WXworkBaseAPI:
def __init__(self, auth: WXworkAuth):
self.auth = auth
@property
def access_token(self):
return self.auth.get_access_token()
class WXworkApprovalAPI(WXworkBaseAPI):
def get_approval_list(self, template_id: str, start_time: int, end_time: int) -> list:
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token={self.access_token}"
body = {
"starttime": str(start_time),
"endtime": str(end_time),
"new_cursor": "",
"size": 100,
"filters": [
{
"key": "template_id",
"value": template_id
},
{
"key": "sp_status",
"value": "2"
}
]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return []
else:
approval_info = response.json()
return approval_info["sp_no_list"]
def get_approval_info(self, approval_id: str) -> dict:
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token={self.access_token}"
body = {
"sp_no": approval_id
}
response = requests.post(url, json=body)
approval_info = response.json()
return approval_info
def get_approval_list_weekly(self, template_id) -> list:
current_time = time.time()
# 获取当前时间的结构体
current_local_time = time.localtime(current_time)
# 星期几0=星期一6=星期天)
weekday = current_local_time.tm_wday
# 计算下一个星期天的天数
days_until_sunday = (6 - weekday) % 7
this_weekday = int(current_time + (days_until_sunday * 86400))
last_weekday = int(this_weekday - 7 * 24 * 3600)
return self.get_approval_list(template_id, last_weekday, this_weekday)
class WXworkCalendarAPI(WXworkBaseAPI):
def create_calendar(self, calendar_name, userid):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/add?access_token={self.access_token}"
body = {
"calendar": {
"admins": [
userid
],
"set_as_default": 1,
"summary": calendar_name,
"color": "#FF3030",
"description": "访问日历",
"is_public": 0,
"is_corp_calendar": 0,
"shares": [
{
"userid": userid,
"permission": 1
},
],
}
}
response = requests.post(url, json=body)
cinfo = response.json()
if "cal_id" in cinfo:
return cinfo["cal_id"]
else:
return cinfo
def delete_calendar(self, calendar_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/del?access_token={self.access_token}"
body = {
"cal_id": calendar_id
}
response = requests.post(url, json=body)
calendar_info = response.json()
return calendar_info
def get_calendar_list(self, calendar_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get_by_calendar?access_token={self.access_token}"
body = {
"cal_id": calendar_id,
"offset": 0,
"limit": 1000
}
response = requests.post(url, json=body)
calendar_list = response.json()
return calendar_list
def create_schedule(self, calendar_id, schedule_name, start_time, end_time, user_id, content, address=""):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/add?access_token={self.access_token}"
if start_time == end_time:
end_time = str(int(start_time) + 1)
body = {
"schedule": {
"admins": [
user_id
],
"start_time": start_time,
"end_time": end_time,
"is_whole_day": 0,
"attendees": [{
"userid": user_id
}],
"summary": schedule_name,
"description": content,
"reminders": {
"is_remind": 1,
"remind_before_event_secs": 3600,
"timezone": 8
},
"location": address,
"cal_id": calendar_id
}
}
response = requests.post(url, json=body)
schedule_info = response.json()
return schedule_info
def get_schedule(self, schedule_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get?access_token={self.access_token}"
body = {
"schedule_id_list": [
schedule_id
]
}
response = requests.post(url, json=body)
schedule_list = response.json()
return schedule_list
def delete_schedule(self, schedule_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/del?access_token={self.access_token}"
body = {
"schedule_id": schedule_id
}
response = requests.post(url, json=body)
schedule_info = response.json()
return schedule_info
class WXworkUserInfoAPI(WXworkBaseAPI):
def get_user_info(self, user_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={self.access_token}&userid={user_id}"
response = requests.get(url)
user_info = response.json()
return user_info
class WXworkDocAPI:
def __init__(self, auth: WXworkAuth, space_id):
self.space_id = space_id
self.auth = auth
self.access_token = self.auth.get_access_token()
def create_space(self, space_name):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_create?access_token={self.access_token}"
body = {
"space_name": space_name,
"auth_info": [{
"type": 1,
"userid": "lychang",
"auth": 7
}],
"space_sub_type": 0
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_space_info(self):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_info?access_token={self.access_token}"
body = {
"spaceid": self.space_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_space_file_list(self):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/file_list?access_token={self.access_token}"
body = {
"spaceid": self.space_id,
"fatherid": self.space_id,
"sort_type": 1,
"start": 0,
"limit": 100
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_document_info(self, doc_id):
self.doc_id = doc_id
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/document/get?access_token={self.access_token}"
body = {
"docid": self.doc_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_table_info(self, doc_id, sheet_id, view_ids: list):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_views?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"view_ids": view_ids,
"offset": 0,
"limit": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_table_data(self, doc_id, sheet_id, view_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"view_id": view_id,
"record_ids": [],
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"field_titles": [],
"field_ids": [],
"sort": [],
"offset": 0,
"limit": 100
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_sheet_data(self, doc_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet?access_token={self.access_token}"
body = {
"docid": doc_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def create_table(self, doc_name, user_ids: list, father_id=None):
"""
doc_type 文档类型, 3:文档 4:表格 10:智能表格
"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/create_doc?access_token={self.access_token}"
body = {
"doc_type": 10,
"doc_name": doc_name,
"admin_users": user_ids
}
if self.space_id:
body["spaceid"] = self.space_id
if father_id:
body["fatherid"] = father_id
else:
body["fatherid"] = self.space_id
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def add_row(self, doc_id, sheet_id, records: list):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_records?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": records
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def parse_data(data: dict, mapping: dict) -> dict:
user_id = data.get("applyer").get("userid")
app_data = data.get("apply_data").get("contents")
_info = {}
for i in app_data:
key = i.get("title")[0]["text"]
cp_id = i.get("id")
control = i.get("control")
value = i.get("value")
if control == "Text":
result = value.get("text")
elif control == "Number":
result = value.get("new_number")
elif control == "Date":
result = value.get("date").get("s_timestamp")
elif control == "Selector":
_t = value.get("selector").get("type")
if _t == "multi":
opts = value.get("selector").get("options", [])
result = [j["key"] for j in opts]
else:
opts = i.get("selector").get("options", [])
result = opts[0]["key"]
else:
result = ""
_info[cp_id] = {"key": key, "value": result}
_r = {v: _info.get(k, {}).get("value") for k, v in mapping.items()}
_r["_uid"] = user_id
return _r
crop_id = "wx98dfa35ad7e4b271"
space_id = "s.wx98dfa35ad7e4b271.744621152iSk"
f_id = 'dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw'
wxc_app_secret = "gyN5moBa6Ev1Vd0ZLUVrsEtAO_goppWKUBdsnSng-Ks"
wapi_app_secret = "oM0hwIi8GRPw6HWk9o5__g3v5ziz5CGyBUo2FASSrVw"
wxc_auth = WXworkAuth(crop_id, wxc_app_secret)
wapi_auth = WXworkAuth(crop_id, wapi_app_secret)
cf_api = CloudFunctionAPI("http://10.0.0.39/cloud")
wxc_api = WXworkCalendarAPI(wxc_auth)
wapi = WXworkApprovalAPI(wapi_auth)
wup = WXworkUserInfoAPI(wapi_auth)
ssp = WXworkDocAPI(wxc_auth, space_id)
def file_exists(file_path):
_url = f"https://a.cmdp.cn/basiceg/v1/csp/exist/{file_path}"
response = requests.get(_url.format(file_path=file_path))
return response.json().get("exist", False)
class CalendarDB:
def __init__(self):
self.name = None
self.user_id = None
self.user_info = None
self.calendar_id = None
self.calendar_connection = None
self.cloud_connection = None
def login(self, login_user: str):
self.name = login_user
self.calendar_connection = wxc_api
self.cloud_connection = cf_api
user_info = self.cloud_connection.get_user_info_by_name(user_name=self.name)
self.user_id = user_info.get('id')
user_tags = self.cloud_connection.parse_tags(user_info.get('tags', []))
calendar_ids = [i for i in user_tags.get('calendar', {}).get('id', {})]
self.calendar_id = calendar_ids[0] if calendar_ids else None
if not self.calendar_id:
self.calendar_id = self.create_calendar("访问日历")
if self.calendar_id:
self.cloud_connection.update_user_tag(user_id=self.user_id, key="calendar_id", value=self.calendar_id)
return {"calendar": self.calendar_id}
def create_calendar(self, calendar_name: str):
return self.calendar_connection.create_calendar(calendar_name, self.name)
@abstractmethod
def _transfer_calendar_data(self, data: dict) -> dict:
pass
def get_all(self):
return self.calendar_connection.get_calendar_list(self.calendar_id)
def add(self, data: dict):
_data = data.copy()
data.pop("_uid")
schedule_name = f"{data.get('company_name')}[{data.get('company_code')}] 拜访计划"
start_time = data.get("visit_time")
end_time = data.get("end_time")
content = json.dumps(data, ensure_ascii=False)
address = data.get("company_address")
self.calendar_connection.create_schedule(self.calendar_id, schedule_name, start_time, end_time, self.name,
content,
address)
return self.trigger(_data)
@abstractmethod
def trigger(self, data: dict):
pass
def delete(self, schedule_id: str):
self.trigger({})
pass
def get_report_ticket(file_path: str, download_name: str):
"""
四.生成文件下载地址生成【通用】
说明
调用接口获取ticket然后拼接地址后发给用户
接口地址
https://a.cmdp.cn/v1/cloudfile/file/createTicket
输入参数[post]
{
"appId": "",
"uri": "s3://usercentre03-dev/user/documents/my document/00002rfdata11792529c9f6e136e282b6416ff20d3f.json", //文件的s3地址
"startTime": "2024-03-21 12:12:12",
"endTime": "",
"maxTimes": 0,
"uid": "",
"name": "1.json", //下载时的文件名
"isPublic":false
}
返回示例:
{
"code": 200,
"msg": null,
"data": "LCXNDEwla",
"validations": null,
"token": null,
"success": true
}
拼接文件下载地址
https://ea.cmdp.cn/v1/cloudfile/file/?ticket=LCXNDEwla
"""
url = "https://a.cmdp.cn/v1/cloudfile/file/createTicket"
body = {
"appId": "",
"uri": file_path,
# 文件的s3地址
"startTime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"endTime": "",
"maxTimes": 0,
"uid": "",
"name": download_name, # 下载时的文件名
"isPublic": False
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
ticket = response.json().get("data")
download_url = f"https://a.cmdp.cn/v1/cloudfile/file/?ticket={ticket}"
return download_url
class SalesTools:
@staticmethod
def get_user_info(security_code: str):
"""
二.用户列表查询
说明
【同步调用】
接口地址
http://10.0.0.39/cloud/in/api/userQuery
输入参数[post]
{
"secCode": "000002",
"status":10,
"PageSize": 10,
"PageNo": 1
}"""
return cf_api.get_user_info_by_sec(security_code)
@staticmethod
def update_user_tag(user_id: str):
"""
三.用户打标签
说明
【同步调用】
接口地址
http://10.0.0.39/cloud/in/api/user
输入参数[post]
{
"id": "1722805021193383937",
"tags": [
"客户类型_上市公司",
"客户权限_动态令牌"
]
}
"""
cf_api.update_user_tags(user_id, {"客户类型": "上市公司", "客户权限": "动态令牌"})
@staticmethod
def get_org_info(security_code: str):
"""
四.通用机构信息查询
说明
接口地址
http://ea.cmdp.cn/apm/v1/ability/api000353
输入参数[post]
{
"_data": {
"searchType": "stock",
"secCode": "000002" //证券代码
}
}
"""
url = "http://a.cmdp.cn/apm/v1/ability/api000353"
body = {
"_data": {
"searchType": "stock",
"secCode": security_code
}
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
return response.json().get("data", {})
@staticmethod
def get_report_rule(tags: list):
"""
四.尽调报告的规则文件地址查询
说明
接口地址
https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release
输入参数[post]
{
"opUserId": "genesys_app",
"sourceUserId": "genesys_app",
"filter":"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '尽调报告云盘项目名称,需要问@冯昕宇'",【企业售前画像报告】
"tags":["报告年度_2023","biz_项目主体类别_单体","biz_业务场所_深交所"]
}
"""
url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release"
body = {
"opUserId": "genesys_app",
"sourceUserId": "genesys_app",
"filter": f"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '企业售前画像报告'",
"tags": tags
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
return response.json()
@staticmethod
def get_report_info(user_id: str, security_code: str, rule_path: str):
"""
四.尽调报告执行
说明
执行后报告需要生成ticket的下载地址
接口地址
https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile
输入参数[post]
{
"issync": 2,
"ruleobj": "temp-003/rule.json",
"dataobj": {
"uid": "用户ID", //填入真实用户ID执行后会在微信公众收到结果消息可以直接打开报告
"_data": {
"secCode": "000002"
},
"_var": {
"config": {
"prodName": "企业售前画像报告"
}
}
}
}
"""
url = "https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile"
body = {
"issync": 2, # 同步执行
"ruleobj": rule_path,
"dataobj": {
"uid": user_id,
"_data": {
"secCode": security_code
},
"_var": {
"config": {
"prodName": "企业售前画像报告"
}
}
}}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
return response.json().get("dumpmsg", {}).get("_out", {})
@staticmethod
def get_regression_testing(eid: str):
"""
执行后报告需要生成ticket的下载地址2024年半年报
@卢天宝说明一下文件存储路径,@常连钰直接根据文件存储路径生成下载ticket但是需要判断是否存在https://a.cmdp.cn/basiceg/v1/csp/exist/{path}
存储路径announce02/{textId}._val_01.json , announce02/{textId}._val_01.docx
textId获取
请求地址posthttp://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping
参数
{
"tags": [
"报告期_2024QR1", //替换为真实报告期
"GE8002005E_100000000002334460" //GE8002005E_{eid}
]
}
返回示例:
{
"total": 1,
"records": [
{
"eid": "100000000037315576",
"tags": {
"公司名称_上海凌云实业发展股份有限公司": true,
"GE8002005E_100000000002334460": true,
"证券代码_900957": true,
"报告期_2024QR1": true,
"TEXTID_1219815286": true, //其中1219815286就是text的值
"公告标题_凌云凌云B股2024年第一季度报告": true
}
}
]
}
"""
def get_text_id(report_date, entity_id):
url = "https://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping"
body = {
"tags": [
f"报告期_{report_date}",
f"GE8002005E_{entity_id}"
]
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
result = response.json()
tags_ = result.get("records", [{}])
if tags_:
tags = tags_[0].get("tags", [{}])
else:
tags = []
text_id = None
for tag in tags:
l = tag.split("_")
if l[0] == "TEXTID":
text_id = l[1]
return text_id
def get_report_datetime():
report_datetime = time.localtime(time.time())
year = report_datetime.tm_year
month = report_datetime.tm_mon
if month >= 9:
return [f"{year}SAR", f"{year - 1}AR"]
elif month >= 4:
return [f"{year - 1}SAR", f"{year - 1}AR"]
else:
return [f"{year - 1}SAR", f"{year - 2}AR"]
def get_report(tid):
if not tid:
return None
docx_path = f"announce02/{tid}_val_01.docx"
json_path = f"announce02/{tid}_val_01.json"
if file_exists(json_path) and file_exists(docx_path):
return flag, get_report_ticket("s3://" + docx_path, "回归测试报告.docx")
else:
print("file not exist")
return None, None
flags = get_report_datetime()
result = ""
for flag in flags:
text_id = get_text_id(flag, eid)
f, r = get_report(text_id)
if r:
result += f'<br><p color="black">{f}:</p><a href="{r}" target="_blank" style="color: rgb(38, 126, 240); text-decoration-line: underline; outline-style: none; cursor: pointer; font-size: 16px; font-weight: 400;">{r}</a>'
if result != "":
return result
else:
return "暂无"
@staticmethod
def get_current_product_id():
now = time.localtime(time.time())
year = now.tm_year
month = now.tm_mon
if month >= 5:
year = year - 1
else:
year = year - 2
url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_listall/usys/products/release"
body = {
"opUserId": "genesys_app",
"sourceUserId": "genesys_app",
"filter": "'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.resource.label' eq '小智智填-定期报告专版'",
"tags": ["tech_sbom", f"prod_year_{year}", "type_入口", "prod_小智转写"]
}
response = requests.post(url, json=body)
if response.status_code == 200:
response_json = response.json()
code = response_json.get("code", -1)
if code == 200:
data = response_json.get("data", {})
records = data.get("records", [])
if records:
widget_info = records[0].get("fileInfo", {}).get("extends", {}).get("widget", {})
product_id = widget_info.get("id")
return product_id
return None
@staticmethod
def get_smart_genie_product(entity_id: str, product_id: str):
"""
说明
选样本后的订购【打印或复制】小智类产品
内部接口eid和个人uid是输入的。
产品选择是输入产品id
产品 产品id 说明
小智复核-定期报告专版 已有产品,示例
小智智填-定期报告专版 1854436156570607618 产品id待补充
接口地址
https://cloud.cmdp.cn/api/product/deliver/print
输入参数[post]
{
"eid": "eid不填就是查询用户所属公司的",
"pid": "必填入口产品id",
"priceType":"产品版本,枚举:体验、标准版、旗舰版、特别优惠版";售前产品固定:标准版
}
输入示例
{
"eid": "100000000002334059",
"pid": "1858326969193848833",
"priceType":"标准版"
}
返回参数
{"Code":"状态码""data":"入口产品id"}
返回示例
{"Code":200"data":"1853245209740582914"}
打印成功后会有公众号通知
"""
url = "https://cloud.cmdp.cn/api/product/deliver/print"
body = {
"eid": entity_id,
"pid": product_id if product_id else "",
"priceType": "体验版"
}
response = requests.post(url, json=body)
if response.status_code == 200:
response_json = response.json()
code = response_json.get("code", -1)
if code == 200:
return response_json.get("data", "")
return None
@staticmethod
def get_product_qrcode(entity_id: str, product_id: str):
"""
接口示例:
标签查fid
curl --location --request POST "http://10.0.0.39/octopus/usr/genesys_app/file/query" ^
--header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^
--header "Content-Type: application/json" ^
--header "Accept: */*" ^
--header "Host: 10.0.0.39" ^
--header "Connection: keep-alive" ^
--data-raw "{ \"Category\": \"default\", \"ownerID\":\"genesys_app\", \"query\": \"\\\"uid_1652264045856165888\\\"\", \"path\":\"/system/users\", \"includeHistory\":false}"
fid查文件
curl --location --request GET "http://10.0.0.39/octopus/usr/genesys_app/file/1-23823/meta" ^
--header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^
--header "Accept: */*" ^
--header "Host: 10.0.0.39" ^
--header "Connection: keep-alive"
query查询标签
查询标签 【"eid_{eid}","pid_{pid}","priceType_标准版"
"""
def get_file_by_fid(file_id):
url = f"http://10.0.0.39/octopus/usr/genesys_app/file/{file_id}/meta"
try:
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
return {}
except Exception:
return {}
def get_file_id(entity_id: str, product_id: str):
url = "http://10.0.0.39/octopus/usr/genesys_app/file/query"
body = {
"Category": "default",
"ownerID": "genesys_app",
"query": f"\"eid_{entity_id}\" & \"pid_{product_id}\" & \"priceType_标准版\"",
"path": "/system/users",
"includeHistory": False
}
response = requests.post(url, json=body)
if response.status_code == 200:
return response.json()
else:
return []
if not product_id:
return None
file_ids = get_file_id(entity_id, product_id)
if not file_ids:
return None
file_info = get_file_by_fid(file_ids[0])
return file_info.get("uri")
@staticmethod
def send_company_mail(to_address: str, subject: str, content: str):
url = "https://a.cmdp.cn/apm/v1/ability/api000357"
body = {"_data": {
"type": "html",
"tos": [to_address],
"subject": subject,
"text": content
}
}
response = requests.post(url, json=body)
return response.json()
@staticmethod
def get_qrcode_base64(product_qrcode_path):
if not product_qrcode_path:
return ""
response = requests.get(product_qrcode_path)
if response.status_code == 200:
return base64.b64encode(response.content).decode('utf-8')
else:
return ""
@staticmethod
def get_information_disclosure_picture(year: str, security_code: str):
basiceg_url = "http://a.cmdp.cn/basiceg/v1/bytes"
file_path = f"/usercentre03-dev/user/cache/{year}/信披成绩单/{security_code}_idsr.jpg"
static_path = f"/product-data/static/xpcjd/{security_code}_idsr.jpg"
if file_exists(file_path):
resp = requests.get(basiceg_url + file_path)
if resp.status_code == 200:
data = resp.content
requests.put(basiceg_url + static_path, data=data)
return f'<img src="http://www.cmdp.cn/a/s/xpcjd/{security_code}_idsr.jpg" width="100%">'
else:
return ""
st = SalesTools()
class CalendarEventDB(CalendarDB):
def _transfer_calendar_data(self, data: dict) -> dict:
pass
def trigger(self, data: dict):
security_code = data.get("company_code")
user_list = st.get_user_info(security_code)
# 获取公司信息
company_info = st.get_org_info(security_code)
eid = company_info.get("eid")
full_name = company_info.get("fullName")
# 打标签
if user_list:
for user in user_list:
user_id = user.get("id")
st.update_user_tag(user_id)
# 获取bp id
bp_info = self.cloud_connection.get_user_info_by_name(self.name)
bp_id = bp_info.get("id")
bp_mail = bp_info.get("email")
# 获取报告文件
now = time.localtime(time.time())
year = now.tm_year
month = now.tm_mon
if month >= 5:
year = year - 1
else:
year = year - 2
report_tags = [f"报告年度_{year}", "biz_项目主体类别_单体",
"biz_业务场所_深交所"]
report_rule_info = st.get_report_rule(report_tags)
report_rule_path = report_rule_info.get("data", {}).get("records", "")
if report_rule_path:
report_rule_path = report_rule_path[0].get("filePath", "")
report_info = st.get_report_info(bp_id, security_code, report_rule_path)
report_ticket = report_info.get("data").get("path")
regression_testing = st.get_regression_testing(eid)
with open("template.html", "r", encoding="utf-8") as f:
content = f.read()
content = content.replace("{{report_ticket}}", report_ticket)
content = content.replace("{{regression_testing}}", regression_testing)
content = content.replace("{{picture}}", st.get_information_disclosure_picture("2024", security_code))
st.send_company_mail(bp_mail, f"客户拜访准备文件-{full_name}[{security_code}]", content)
ssp.add_row(doc_id=f_id, sheet_id="tTMuVB", records=[
{
"values": {
"证券代码": [
{
"type": "text",
"text": data["company_code"]
}
],
"证券简称": [{
"type": "text",
"text": data["company_name"]
}]
,
"地点": [{
"type": "text",
"text": data["company_address"]
}]
,
"拜访时间": data["visit_time"] + "000"
,
"结束时间": data["end_time"] + "000",
"拜访人": [{
"type": "text",
"text": data["_uid"]
}]
}
}])
ceb = CalendarEventDB()
def get_approval(template_id):
approval_list = wapi.get_approval_list_weekly(template_id)
info = {}
for approval_id in approval_list:
approval_info = wapi.get_approval_info(approval_id)
approval_info = parse_data(approval_info["info"],
{"Number-1697103952657": "company_code", "Text-1697103919621": "company_name",
"Text-1728374569134": "company_address", "Date-1728370374837": "visit_time",
"Date-1728370485478": "end_time"})
_uid = approval_info["_uid"]
if _uid not in info:
_u = wup.get_user_info(_uid)
info[_uid] = {"name": _u["name"]}
else:
_p = info[_uid]
company_code = approval_info["company_code"]
if company_code not in _p:
_p[company_code] = 1
else:
_p[company_code] += 1
return info
def generate_approval_report_html(times_report):
htms = "<!DOCTYPE html><html lang='en'><head><meta charset='UTF-8'><title></title><style>table{width:100%;border-collapse:collapse;font-family:Arial,sans-serif;box-shadow:0 2px 10px rgba(0,0,0,0.1)}caption{font-size:2em;font-weight:bold;margin:1em 0}th,td{border:1px solid#ccc;text-align:center;padding:15px;transition:background-color 0.3s}thead tr{background-color:#007BFF;color:#fff}tbody tr:nth-child(odd){background-color:#f9f9f9}tbody tr:hover{background-color:#e2e6ea}tfoot tr td{text-align:right;padding-right:20px}th{border-bottom:2px solid#007BFF}</style> </head><body><table border='1'><tr><th>姓名</th><th>公司代码</th><th>次数</th></tr>"
htme = "</table></body></html>"
content = ""
for key in times_report:
key_report = times_report[key]
name = key_report["name"]
key_report_list = [(i, key_report[i]) for i in key_report if i != "name"]
size = len(key_report_list)
for idx, key_info in enumerate(key_report_list):
if idx == 0:
content += f"<tr><td rowspan='{size}'>{name}</td><td>{key_info[0]}</td><td>{key_info[1]}</td></tr>"
else:
content += f"<tr><td>{key_info[0]}</td><td>{key_info[1]}</td></tr>"
return htms + content + htme
def new_product_print(info):
url = "https://cloud.cmdp.cn/api/product/print/customize"
body = {"userId": info["user_id"], "secCode": info["company_code"], "prodName": info["service_name"],
"opUserId": info["_uid"]}
r = requests.post(url, json=body)
return r.json()