# !/usr/bin/env python
# -*- coding:utf-8 -*-
import base64
import json
import time
from abc import abstractmethod
import requests
class WXworkAuth:
def __init__(self, app_id, app_key):
self.crop_id = app_id
self.crop_secret = app_key
self.access_token = None
self.timestamp = None
def _get_access_token(self):
self.timestamp = int(time.time())
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.crop_id}&corpsecret={self.crop_secret}"
response = requests.get(url)
access_info = response.json()
if 'access_token' in access_info:
self.access_token = access_info['access_token']
else:
raise Exception('获取access_token失败')
def get_access_token(self):
_now = int(time.time())
if not self.access_token or (_now - self.timestamp) > 7200:
self.refresh_token()
return self.access_token
def refresh_token(self):
try:
self._get_access_token()
except Exception as e:
print(e, '重新获取')
self._get_access_token()
class CloudFunctionAPI:
def __init__(self, base_url):
self.base_url = base_url
def get_user_info_by_id(self, user_id: str) -> dict:
url = f"{self.base_url}/in/api/user"
body = {
"uid": user_id
}
response = requests.get(url, params=body)
if response.status_code != 200:
return {}
user_info = response.json().get('data', {})
return user_info
def get_user_info_by_sec(self, security_code: str) -> list:
url = f"{self.base_url}/in/api/userQuery"
body = {
"secCode": security_code,
"status": 10,
"PageSize": 10,
"PageNo": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return []
user_info = response.json().get("list")
if user_info is None:
user_info = [{}]
return user_info
def get_user_info_by_name(self, user_name: str) -> dict:
url = f"{self.base_url}/in/api/userQuery"
body = {
"loginName": user_name,
"status": 10,
"PageSize": 10,
"PageNo": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json().get("list")
if user_info is None:
user_info = {}
else:
user_info = user_info[0]
return user_info
@staticmethod
def parse_tags(tags: list):
def _parse_tag(_tag: str):
return _tag.split("_")
temp_dict = {}
for tag in tags:
tag_split = _parse_tag(tag)
size = len(tag_split)
cursor = None
for idx, i in enumerate(tag_split):
if idx == 0:
cursor = temp_dict
val = {} if idx < size - 1 else True
cursor[i] = val
if isinstance(val, bool):
cursor = None
else:
cursor = cursor[i]
print(temp_dict)
return temp_dict
def get_user_tags(self, user_id: str = "", user_name: str = "") -> dict:
if user_id:
user_info = self.get_user_info_by_id(user_id)
elif user_name:
user_info = self.get_user_info_by_name(user_name)
else:
return {}
tags = user_info.get('tags', [])
return self.parse_tags(tags)
def update_user_tag(self, user_id: str, key: str, value: str):
url = f"{self.base_url}/in/api/userTags"
body = {
"userId": user_id,
"tags": [f"{key}_{value}"]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json()
return user_info
def update_user_tag_by_name(self, user_name: str, key: str, value: str):
user_info = self.get_user_info_by_name(user_name)
user_id = user_info.get('id')
if not user_id:
return {}
return self.update_user_tag(user_id, key, value)
def update_user_tags(self, user_id: str, tags: dict):
url = f"{self.base_url}/in/api/userTags"
body = {
"userId": user_id,
"tags": [f"{key}_{value}" for key, value in tags.items()]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json()
return user_info
class WXworkBaseAPI:
def __init__(self, auth: WXworkAuth):
self.auth = auth
@property
def access_token(self):
return self.auth.get_access_token()
class WXworkApprovalAPI(WXworkBaseAPI):
def get_approval_list(self, template_id: str, start_time: int, end_time: int) -> list:
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token={self.access_token}"
body = {
"starttime": str(start_time),
"endtime": str(end_time),
"new_cursor": "",
"size": 100,
"filters": [
{
"key": "template_id",
"value": template_id
},
{
"key": "sp_status",
"value": "2"
}
]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return []
else:
approval_info = response.json()
return approval_info["sp_no_list"]
def get_approval_info(self, approval_id: str) -> dict:
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token={self.access_token}"
body = {
"sp_no": approval_id
}
response = requests.post(url, json=body)
approval_info = response.json()
return approval_info
def get_approval_list_weekly(self, template_id) -> list:
current_time = time.time()
# 获取当前时间的结构体
current_local_time = time.localtime(current_time)
# 星期几(0=星期一,6=星期天)
weekday = current_local_time.tm_wday
# 计算下一个星期天的天数
days_until_sunday = (6 - weekday) % 7
this_weekday = int(current_time + (days_until_sunday * 86400))
last_weekday = int(this_weekday - 7 * 24 * 3600)
return self.get_approval_list(template_id, last_weekday, this_weekday)
class WXworkCalendarAPI(WXworkBaseAPI):
def create_calendar(self, calendar_name, userid):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/add?access_token={self.access_token}"
body = {
"calendar": {
"admins": [
userid
],
"set_as_default": 1,
"summary": calendar_name,
"color": "#FF3030",
"description": "访问日历",
"is_public": 0,
"is_corp_calendar": 0,
"shares": [
{
"userid": userid,
"permission": 1
},
],
}
}
response = requests.post(url, json=body)
cinfo = response.json()
if "cal_id" in cinfo:
return cinfo["cal_id"]
else:
return cinfo
def delete_calendar(self, calendar_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/del?access_token={self.access_token}"
body = {
"cal_id": calendar_id
}
response = requests.post(url, json=body)
calendar_info = response.json()
return calendar_info
def get_calendar_list(self, calendar_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get_by_calendar?access_token={self.access_token}"
body = {
"cal_id": calendar_id,
"offset": 0,
"limit": 1000
}
response = requests.post(url, json=body)
calendar_list = response.json()
return calendar_list
def create_schedule(self, calendar_id, schedule_name, start_time, end_time, user_id, content, address=""):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/add?access_token={self.access_token}"
if start_time == end_time:
end_time = str(int(start_time) + 1)
body = {
"schedule": {
"admins": [
user_id
],
"start_time": start_time,
"end_time": end_time,
"is_whole_day": 0,
"attendees": [{
"userid": user_id
}],
"summary": schedule_name,
"description": content,
"reminders": {
"is_remind": 1,
"remind_before_event_secs": 3600,
"timezone": 8
},
"location": address,
"cal_id": calendar_id
}
}
response = requests.post(url, json=body)
schedule_info = response.json()
return schedule_info
def get_schedule(self, schedule_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get?access_token={self.access_token}"
body = {
"schedule_id_list": [
schedule_id
]
}
response = requests.post(url, json=body)
schedule_list = response.json()
return schedule_list
def delete_schedule(self, schedule_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/del?access_token={self.access_token}"
body = {
"schedule_id": schedule_id
}
response = requests.post(url, json=body)
schedule_info = response.json()
return schedule_info
class WXworkUserInfoAPI(WXworkBaseAPI):
def get_user_info(self, user_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={self.access_token}&userid={user_id}"
response = requests.get(url)
user_info = response.json()
return user_info
class WXworkDocAPI:
def __init__(self, auth: WXworkAuth, space_id):
self.space_id = space_id
self.auth = auth
self.access_token = self.auth.get_access_token()
def create_space(self, space_name):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_create?access_token={self.access_token}"
body = {
"space_name": space_name,
"auth_info": [{
"type": 1,
"userid": "lychang",
"auth": 7
}],
"space_sub_type": 0
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_space_info(self):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_info?access_token={self.access_token}"
body = {
"spaceid": self.space_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_space_file_list(self):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/file_list?access_token={self.access_token}"
body = {
"spaceid": self.space_id,
"fatherid": self.space_id,
"sort_type": 1,
"start": 0,
"limit": 100
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_document_info(self, doc_id):
self.doc_id = doc_id
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/document/get?access_token={self.access_token}"
body = {
"docid": self.doc_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_table_info(self, doc_id, sheet_id, view_ids: list):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_views?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"view_ids": view_ids,
"offset": 0,
"limit": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_table_data(self, doc_id, sheet_id, view_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"view_id": view_id,
"record_ids": [],
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"field_titles": [],
"field_ids": [],
"sort": [],
"offset": 0,
"limit": 100
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_sheet_data(self, doc_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet?access_token={self.access_token}"
body = {
"docid": doc_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def create_table(self, doc_name, user_ids: list, father_id=None):
"""
doc_type 文档类型, 3:文档 4:表格 10:智能表格
"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/create_doc?access_token={self.access_token}"
body = {
"doc_type": 10,
"doc_name": doc_name,
"admin_users": user_ids
}
if self.space_id:
body["spaceid"] = self.space_id
if father_id:
body["fatherid"] = father_id
else:
body["fatherid"] = self.space_id
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def add_row(self, doc_id, sheet_id, records: list):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_records?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": records
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def parse_data(data: dict, mapping: dict) -> dict:
user_id = data.get("applyer").get("userid")
app_data = data.get("apply_data").get("contents")
_info = {}
for i in app_data:
key = i.get("title")[0]["text"]
cp_id = i.get("id")
control = i.get("control")
value = i.get("value")
if control == "Text":
result = value.get("text")
elif control == "Number":
result = value.get("new_number")
elif control == "Date":
result = value.get("date").get("s_timestamp")
elif control == "Selector":
_t = value.get("selector").get("type")
if _t == "multi":
opts = value.get("selector").get("options", [])
result = [j["key"] for j in opts]
else:
opts = i.get("selector").get("options", [])
result = opts[0]["key"]
else:
result = ""
_info[cp_id] = {"key": key, "value": result}
_r = {v: _info.get(k, {}).get("value") for k, v in mapping.items()}
_r["_uid"] = user_id
return _r
crop_id = "wx98dfa35ad7e4b271"
space_id = "s.wx98dfa35ad7e4b271.744621152iSk"
f_id = 'dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw'
wxc_app_secret = "gyN5moBa6Ev1Vd0ZLUVrsEtAO_goppWKUBdsnSng-Ks"
wapi_app_secret = "oM0hwIi8GRPw6HWk9o5__g3v5ziz5CGyBUo2FASSrVw"
wxc_auth = WXworkAuth(crop_id, wxc_app_secret)
wapi_auth = WXworkAuth(crop_id, wapi_app_secret)
cf_api = CloudFunctionAPI("http://10.0.0.39/cloud")
wxc_api = WXworkCalendarAPI(wxc_auth)
wapi = WXworkApprovalAPI(wapi_auth)
wup = WXworkUserInfoAPI(wapi_auth)
ssp = WXworkDocAPI(wxc_auth, space_id)
def file_exists(file_path):
_url = f"https://a.cmdp.cn/basiceg/v1/csp/exist/{file_path}"
response = requests.get(_url.format(file_path=file_path))
return response.json().get("exist", False)
class CalendarDB:
def __init__(self):
self.name = None
self.user_id = None
self.user_info = None
self.calendar_id = None
self.calendar_connection = None
self.cloud_connection = None
def login(self, login_user: str):
self.name = login_user
self.calendar_connection = wxc_api
self.cloud_connection = cf_api
user_info = self.cloud_connection.get_user_info_by_name(user_name=self.name)
self.user_id = user_info.get('id')
user_tags = self.cloud_connection.parse_tags(user_info.get('tags', []))
calendar_ids = [i for i in user_tags.get('calendar', {}).get('id', {})]
self.calendar_id = calendar_ids[0] if calendar_ids else None
if not self.calendar_id:
self.calendar_id = self.create_calendar("访问日历")
if self.calendar_id:
self.cloud_connection.update_user_tag(user_id=self.user_id, key="calendar_id", value=self.calendar_id)
return {"calendar": self.calendar_id}
def create_calendar(self, calendar_name: str):
return self.calendar_connection.create_calendar(calendar_name, self.name)
@abstractmethod
def _transfer_calendar_data(self, data: dict) -> dict:
pass
def get_all(self):
return self.calendar_connection.get_calendar_list(self.calendar_id)
def add(self, data: dict):
_data = data.copy()
data.pop("_uid")
schedule_name = f"{data.get('company_name')}[{data.get('company_code')}] 拜访计划"
start_time = data.get("visit_time")
end_time = data.get("end_time")
content = json.dumps(data, ensure_ascii=False)
address = data.get("company_address")
self.calendar_connection.create_schedule(self.calendar_id, schedule_name, start_time, end_time, self.name,
content,
address)
return self.trigger(_data)
@abstractmethod
def trigger(self, data: dict):
pass
def delete(self, schedule_id: str):
self.trigger({})
pass
def get_report_ticket(file_path: str, download_name: str):
"""
四.生成文件下载地址生成【通用】
说明
调用接口获取ticket,然后拼接地址后发给用户
接口地址
https://a.cmdp.cn/v1/cloudfile/file/createTicket
输入参数[post]
{
"appId": "",
"uri": "s3://usercentre03-dev/user/documents/my document/00002rfdata11792529c9f6e136e282b6416ff20d3f.json", //文件的s3地址
"startTime": "2024-03-21 12:12:12",
"endTime": "",
"maxTimes": 0,
"uid": "",
"name": "1.json", //下载时的文件名
"isPublic":false
}
返回示例:
{
"code": 200,
"msg": null,
"data": "LCXNDEwla",
"validations": null,
"token": null,
"success": true
}
拼接文件下载地址
https://ea.cmdp.cn/v1/cloudfile/file/?ticket=LCXNDEwla
"""
url = "https://a.cmdp.cn/v1/cloudfile/file/createTicket"
body = {
"appId": "",
"uri": file_path,
# 文件的s3地址
"startTime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"endTime": "",
"maxTimes": 0,
"uid": "",
"name": download_name, # 下载时的文件名
"isPublic": False
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
ticket = response.json().get("data")
download_url = f"https://a.cmdp.cn/v1/cloudfile/file/?ticket={ticket}"
return download_url
class SalesTools:
@staticmethod
def get_user_info(security_code: str):
"""
二.用户列表查询
说明
【同步调用】
接口地址
http://10.0.0.39/cloud/in/api/userQuery
输入参数[post]
{
"secCode": "000002",
"status":10,
"PageSize": 10,
"PageNo": 1
}"""
return cf_api.get_user_info_by_sec(security_code)
@staticmethod
def update_user_tag(user_id: str):
"""
三.用户打标签
说明
【同步调用】
接口地址
http://10.0.0.39/cloud/in/api/user
输入参数[post]
{
"id": "1722805021193383937",
"tags": [
"客户类型_上市公司",
"客户权限_动态令牌"
]
}
"""
cf_api.update_user_tags(user_id, {"客户类型": "上市公司", "客户权限": "动态令牌"})
@staticmethod
def get_org_info(security_code: str):
"""
四.通用机构信息查询
说明
接口地址
http://ea.cmdp.cn/apm/v1/ability/api000353
输入参数[post]
{
"_data": {
"searchType": "stock",
"secCode": "000002" //证券代码
}
}
"""
url = "http://a.cmdp.cn/apm/v1/ability/api000353"
body = {
"_data": {
"searchType": "stock",
"secCode": security_code
}
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
return response.json().get("data", {})
@staticmethod
def get_report_rule(tags: list):
"""
四.尽调报告的规则文件地址查询
说明
接口地址
https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release
输入参数[post]
{
"opUserId": "genesys_app",
"sourceUserId": "genesys_app",
"filter":"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '尽调报告云盘项目名称,需要问@冯昕宇'",【企业售前画像报告】
"tags":["报告年度_2023","biz_项目主体类别_单体","biz_业务场所_深交所"]
}
"""
url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release"
body = {
"opUserId": "genesys_app",
"sourceUserId": "genesys_app",
"filter": f"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '企业售前画像报告'",
"tags": tags
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
return response.json()
@staticmethod
def get_report_info(user_id: str, security_code: str, rule_path: str):
"""
四.尽调报告执行
说明
执行后报告需要生成ticket的下载地址
接口地址
https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile
输入参数[post]
{
"issync": 2,
"ruleobj": "temp-003/rule.json",
"dataobj": {
"uid": "用户ID", //填入真实用户ID,执行后会在微信公众收到结果消息,可以直接打开报告
"_data": {
"secCode": "000002"
},
"_var": {
"config": {
"prodName": "企业售前画像报告"
}
}
}
}
"""
url = "https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile"
body = {
"issync": 2, # 同步执行
"ruleobj": rule_path,
"dataobj": {
"uid": user_id,
"_data": {
"secCode": security_code
},
"_var": {
"config": {
"prodName": "企业售前画像报告"
}
}
}}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
return response.json().get("dumpmsg", {}).get("_out", {})
@staticmethod
def get_regression_testing(eid: str):
"""
执行后报告需要生成ticket的下载地址:2024年半年报
@卢天宝说明一下文件存储路径,@常连钰直接根据文件存储路径生成下载ticket,但是需要判断是否存在:https://a.cmdp.cn/basiceg/v1/csp/exist/{path}
存储路径:announce02/{textId}._val_01.json , announce02/{textId}._val_01.docx
textId获取:
请求地址(post):http://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping
参数 :
{
"tags": [
"报告期_2024QR1", //替换为真实报告期
"GE8002005E_100000000002334460" //GE8002005E_{eid}
]
}
返回示例:
{
"total": 1,
"records": [
{
"eid": "100000000037315576",
"tags": {
"公司名称_上海凌云实业发展股份有限公司": true,
"GE8002005E_100000000002334460": true,
"证券代码_900957": true,
"报告期_2024QR1": true,
"TEXTID_1219815286": true, //其中1219815286就是text的值
"公告标题_凌云B股:凌云B股2024年第一季度报告": true
}
}
]
}
"""
def get_text_id(report_date, entity_id):
url = "https://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping"
body = {
"tags": [
f"报告期_{report_date}",
f"GE8002005E_{entity_id}"
]
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=body, headers=headers)
result = response.json()
tags_ = result.get("records", [{}])
if tags_:
tags = tags_[0].get("tags", [{}])
else:
tags = []
text_id = None
for tag in tags:
l = tag.split("_")
if l[0] == "TEXTID":
text_id = l[1]
return text_id
def get_report_datetime():
report_datetime = time.localtime(time.time())
year = report_datetime.tm_year
month = report_datetime.tm_mon
if month >= 9:
return [f"{year}SAR", f"{year - 1}AR"]
elif month >= 4:
return [f"{year - 1}SAR", f"{year - 1}AR"]
else:
return [f"{year - 1}SAR", f"{year - 2}AR"]
def get_report(tid):
if not tid:
return None
docx_path = f"announce02/{tid}_val_01.docx"
json_path = f"announce02/{tid}_val_01.json"
if file_exists(json_path) and file_exists(docx_path):
return flag, get_report_ticket("s3://" + docx_path, "回归测试报告.docx")
else:
print("file not exist")
return None, None
flags = get_report_datetime()
result = ""
for flag in flags:
text_id = get_text_id(flag, eid)
f, r = get_report(text_id)
if r:
result += f'
{f}:
{r}' if result != "": return result else: return "暂无" @staticmethod def get_current_product_id(): now = time.localtime(time.time()) year = now.tm_year month = now.tm_mon if month >= 5: year = year - 1 else: year = year - 2 url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_listall/usys/products/release" body = { "opUserId": "genesys_app", "sourceUserId": "genesys_app", "filter": "'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.resource.label' eq '小智智填-定期报告专版'", "tags": ["tech_sbom", f"prod_year_{year}", "type_入口", "prod_小智转写"] } response = requests.post(url, json=body) if response.status_code == 200: response_json = response.json() code = response_json.get("code", -1) if code == 200: data = response_json.get("data", {}) records = data.get("records", []) if records: widget_info = records[0].get("fileInfo", {}).get("extends", {}).get("widget", {}) product_id = widget_info.get("id") return product_id return None @staticmethod def get_smart_genie_product(entity_id: str, product_id: str): """ 说明 选样本后的订购【打印或复制】小智类产品 内部接口,eid和个人uid是输入的。 产品选择是输入产品id: 产品 产品id 说明 小智复核-定期报告专版 已有产品,示例 小智智填-定期报告专版 1854436156570607618 产品id待补充 接口地址 https://cloud.cmdp.cn/api/product/deliver/print 输入参数[post] { "eid": "eid,不填就是查询用户所属公司的", "pid": "必填,入口产品id", "priceType":"产品版本,枚举:体验、标准版、旗舰版、特别优惠版";售前产品固定:标准版 } 输入示例 { "eid": "100000000002334059", "pid": "1858326969193848833", "priceType":"标准版" } 返回参数 {"Code":"状态码","data":"入口产品id"} 返回示例 {"Code":200,"data":"1853245209740582914"} 打印成功后会有公众号通知 """ url = "https://cloud.cmdp.cn/api/product/deliver/print" body = { "eid": entity_id, "pid": product_id if product_id else "", "priceType": "体验版" } response = requests.post(url, json=body) if response.status_code == 200: response_json = response.json() code = response_json.get("code", -1) if code == 200: return response_json.get("data", "") return None @staticmethod def get_product_qrcode(entity_id: str, product_id: str): """ 接口示例: 标签查fid: curl --location --request POST "http://10.0.0.39/octopus/usr/genesys_app/file/query" ^ --header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^ --header "Content-Type: application/json" ^ --header "Accept: */*" ^ --header "Host: 10.0.0.39" ^ --header "Connection: keep-alive" ^ --data-raw "{ \"Category\": \"default\", \"ownerID\":\"genesys_app\", \"query\": \"\\\"uid_1652264045856165888\\\"\", \"path\":\"/system/users\", \"includeHistory\":false}" fid查文件: curl --location --request GET "http://10.0.0.39/octopus/usr/genesys_app/file/1-23823/meta" ^ --header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^ --header "Accept: */*" ^ --header "Host: 10.0.0.39" ^ --header "Connection: keep-alive" query查询标签: 查询标签 【"eid_{eid}","pid_{pid}","priceType_标准版"】 """ def get_file_by_fid(file_id): url = f"http://10.0.0.39/octopus/usr/genesys_app/file/{file_id}/meta" try: response = requests.get(url) if response.status_code == 200: return response.json() else: return {} except Exception: return {} def get_file_id(entity_id: str, product_id: str): url = "http://10.0.0.39/octopus/usr/genesys_app/file/query" body = { "Category": "default", "ownerID": "genesys_app", "query": f"\"eid_{entity_id}\" & \"pid_{product_id}\" & \"priceType_标准版\"", "path": "/system/users", "includeHistory": False } response = requests.post(url, json=body) if response.status_code == 200: return response.json() else: return [] if not product_id: return None file_ids = get_file_id(entity_id, product_id) if not file_ids: return None file_info = get_file_by_fid(file_ids[0]) return file_info.get("uri") @staticmethod def send_company_mail(to_address: str, subject: str, content: str): url = "https://a.cmdp.cn/apm/v1/ability/api000357" body = {"_data": { "type": "html", "tos": [to_address], "subject": subject, "text": content } } response = requests.post(url, json=body) return response.json() @staticmethod def get_qrcode_base64(product_qrcode_path): if not product_qrcode_path: return "" response = requests.get(product_qrcode_path) if response.status_code == 200: return base64.b64encode(response.content).decode('utf-8') else: return "" @staticmethod def get_information_disclosure_picture(year: str, security_code: str): basiceg_url = "http://a.cmdp.cn/basiceg/v1/bytes" file_path = f"/usercentre03-dev/user/cache/{year}/信披成绩单/{security_code}_idsr.jpg" static_path = f"/product-data/static/xpcjd/{security_code}_idsr.jpg" if file_exists(file_path): resp = requests.get(basiceg_url + file_path) if resp.status_code == 200: data = resp.content requests.put(basiceg_url + static_path, data=data) return f'姓名 | 公司代码 | 次数 |
---|