1202 lines
41 KiB
Python
1202 lines
41 KiB
Python
# !/usr/bin/env python
|
||
# -*- coding:utf-8 -*-
|
||
|
||
|
||
import base64
|
||
import json
|
||
import time
|
||
from abc import abstractmethod
|
||
|
||
import requests
|
||
|
||
|
||
class WXworkAuth:
|
||
def __init__(self, app_id, app_key):
|
||
self.crop_id = app_id
|
||
self.crop_secret = app_key
|
||
self.access_token = None
|
||
self.timestamp = None
|
||
|
||
def _get_access_token(self):
|
||
self.timestamp = int(time.time())
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.crop_id}&corpsecret={self.crop_secret}"
|
||
response = requests.get(url)
|
||
access_info = response.json()
|
||
if 'access_token' in access_info:
|
||
self.access_token = access_info['access_token']
|
||
else:
|
||
raise Exception('获取access_token失败')
|
||
|
||
def get_access_token(self):
|
||
_now = int(time.time())
|
||
if not self.access_token or (_now - self.timestamp) > 7200:
|
||
self.refresh_token()
|
||
return self.access_token
|
||
|
||
def refresh_token(self):
|
||
try:
|
||
self._get_access_token()
|
||
except Exception as e:
|
||
print(e, '重新获取')
|
||
self._get_access_token()
|
||
|
||
|
||
class CloudFunctionAPI:
|
||
def __init__(self, base_url):
|
||
self.base_url = base_url
|
||
|
||
def get_user_info_by_id(self, user_id: str) -> dict:
|
||
url = f"{self.base_url}/in/api/user"
|
||
body = {
|
||
"uid": user_id
|
||
|
||
}
|
||
response = requests.get(url, params=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
user_info = response.json().get('data', {})
|
||
return user_info
|
||
|
||
def get_user_info_by_sec(self, security_code: str) -> list:
|
||
url = f"{self.base_url}/in/api/userQuery"
|
||
body = {
|
||
"secCode": security_code,
|
||
"status": 10,
|
||
"PageSize": 10,
|
||
"PageNo": 1
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return []
|
||
|
||
user_info = response.json().get("list")
|
||
if user_info is None:
|
||
user_info = [{}]
|
||
return user_info
|
||
|
||
def get_user_info_by_name(self, user_name: str) -> dict:
|
||
url = f"{self.base_url}/in/api/userQuery"
|
||
body = {
|
||
"loginName": user_name,
|
||
"status": 10,
|
||
"PageSize": 10,
|
||
"PageNo": 1
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
user_info = response.json().get("list")
|
||
if user_info is None:
|
||
user_info = {}
|
||
else:
|
||
user_info = user_info[0]
|
||
return user_info
|
||
|
||
@staticmethod
|
||
def parse_tags(tags: list):
|
||
def _parse_tag(_tag: str):
|
||
return _tag.split("_")
|
||
|
||
temp_dict = {}
|
||
for tag in tags:
|
||
tag_split = _parse_tag(tag)
|
||
size = len(tag_split)
|
||
cursor = None
|
||
for idx, i in enumerate(tag_split):
|
||
if idx == 0:
|
||
cursor = temp_dict
|
||
val = {} if idx < size - 1 else True
|
||
cursor[i] = val
|
||
if isinstance(val, bool):
|
||
cursor = None
|
||
else:
|
||
cursor = cursor[i]
|
||
print(temp_dict)
|
||
return temp_dict
|
||
|
||
def get_user_tags(self, user_id: str = "", user_name: str = "") -> dict:
|
||
if user_id:
|
||
user_info = self.get_user_info_by_id(user_id)
|
||
elif user_name:
|
||
user_info = self.get_user_info_by_name(user_name)
|
||
else:
|
||
return {}
|
||
tags = user_info.get('tags', [])
|
||
return self.parse_tags(tags)
|
||
|
||
def update_user_tag(self, user_id: str, key: str, value: str):
|
||
url = f"{self.base_url}/in/api/userTags"
|
||
body = {
|
||
"userId": user_id,
|
||
"tags": [f"{key}_{value}"]
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
user_info = response.json()
|
||
return user_info
|
||
|
||
def update_user_tag_by_name(self, user_name: str, key: str, value: str):
|
||
user_info = self.get_user_info_by_name(user_name)
|
||
user_id = user_info.get('id')
|
||
if not user_id:
|
||
return {}
|
||
return self.update_user_tag(user_id, key, value)
|
||
|
||
def update_user_tags(self, user_id: str, tags: dict):
|
||
url = f"{self.base_url}/in/api/userTags"
|
||
body = {
|
||
"userId": user_id,
|
||
"tags": [f"{key}_{value}" for key, value in tags.items()]
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
user_info = response.json()
|
||
return user_info
|
||
|
||
|
||
class WXworkBaseAPI:
|
||
def __init__(self, auth: WXworkAuth):
|
||
self.auth = auth
|
||
|
||
@property
|
||
def access_token(self):
|
||
return self.auth.get_access_token()
|
||
|
||
|
||
class WXworkApprovalAPI(WXworkBaseAPI):
|
||
def get_approval_list(self, template_id: str, start_time: int, end_time: int) -> list:
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token={self.access_token}"
|
||
body = {
|
||
"starttime": str(start_time),
|
||
"endtime": str(end_time),
|
||
"new_cursor": "",
|
||
"size": 100,
|
||
"filters": [
|
||
{
|
||
"key": "template_id",
|
||
"value": template_id
|
||
},
|
||
{
|
||
"key": "sp_status",
|
||
"value": "2"
|
||
}
|
||
]
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return []
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info["sp_no_list"]
|
||
|
||
def get_approval_info(self, approval_id: str) -> dict:
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token={self.access_token}"
|
||
body = {
|
||
"sp_no": approval_id
|
||
}
|
||
response = requests.post(url, json=body)
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_approval_list_weekly(self, template_id) -> list:
|
||
current_time = time.time()
|
||
|
||
# 获取当前时间的结构体
|
||
current_local_time = time.localtime(current_time)
|
||
|
||
# 星期几(0=星期一,6=星期天)
|
||
weekday = current_local_time.tm_wday
|
||
|
||
# 计算下一个星期天的天数
|
||
days_until_sunday = (6 - weekday) % 7
|
||
|
||
this_weekday = int(current_time + (days_until_sunday * 86400))
|
||
last_weekday = int(this_weekday - 7 * 24 * 3600)
|
||
return self.get_approval_list(template_id, last_weekday, this_weekday)
|
||
|
||
|
||
class WXworkCalendarAPI(WXworkBaseAPI):
|
||
|
||
def create_calendar(self, calendar_name, userid):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/add?access_token={self.access_token}"
|
||
body = {
|
||
"calendar": {
|
||
"admins": [
|
||
userid
|
||
],
|
||
"set_as_default": 1,
|
||
"summary": calendar_name,
|
||
"color": "#FF3030",
|
||
"description": "访问日历",
|
||
"is_public": 0,
|
||
"is_corp_calendar": 0,
|
||
"shares": [
|
||
{
|
||
"userid": userid,
|
||
"permission": 1
|
||
},
|
||
],
|
||
|
||
}
|
||
}
|
||
response = requests.post(url, json=body)
|
||
cinfo = response.json()
|
||
if "cal_id" in cinfo:
|
||
return cinfo["cal_id"]
|
||
else:
|
||
return cinfo
|
||
|
||
def delete_calendar(self, calendar_id):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/del?access_token={self.access_token}"
|
||
body = {
|
||
"cal_id": calendar_id
|
||
}
|
||
response = requests.post(url, json=body)
|
||
calendar_info = response.json()
|
||
return calendar_info
|
||
|
||
def get_calendar_list(self, calendar_id):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get_by_calendar?access_token={self.access_token}"
|
||
body = {
|
||
"cal_id": calendar_id,
|
||
"offset": 0,
|
||
"limit": 1000
|
||
}
|
||
response = requests.post(url, json=body)
|
||
calendar_list = response.json()
|
||
return calendar_list
|
||
|
||
def create_schedule(self, calendar_id, schedule_name, start_time, end_time, user_id, content, address=""):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/add?access_token={self.access_token}"
|
||
if start_time == end_time:
|
||
end_time = str(int(start_time) + 1)
|
||
body = {
|
||
"schedule": {
|
||
"admins": [
|
||
user_id
|
||
],
|
||
"start_time": start_time,
|
||
"end_time": end_time,
|
||
"is_whole_day": 0,
|
||
"attendees": [{
|
||
"userid": user_id
|
||
}],
|
||
"summary": schedule_name,
|
||
"description": content,
|
||
"reminders": {
|
||
"is_remind": 1,
|
||
"remind_before_event_secs": 3600,
|
||
"timezone": 8
|
||
},
|
||
"location": address,
|
||
"cal_id": calendar_id
|
||
}
|
||
}
|
||
response = requests.post(url, json=body)
|
||
schedule_info = response.json()
|
||
return schedule_info
|
||
|
||
def get_schedule(self, schedule_id):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get?access_token={self.access_token}"
|
||
body = {
|
||
"schedule_id_list": [
|
||
schedule_id
|
||
]
|
||
}
|
||
response = requests.post(url, json=body)
|
||
schedule_list = response.json()
|
||
return schedule_list
|
||
|
||
def delete_schedule(self, schedule_id):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/del?access_token={self.access_token}"
|
||
body = {
|
||
"schedule_id": schedule_id
|
||
}
|
||
response = requests.post(url, json=body)
|
||
schedule_info = response.json()
|
||
return schedule_info
|
||
|
||
|
||
class WXworkUserInfoAPI(WXworkBaseAPI):
|
||
def get_user_info(self, user_id):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={self.access_token}&userid={user_id}"
|
||
response = requests.get(url)
|
||
user_info = response.json()
|
||
return user_info
|
||
|
||
|
||
class WXworkDocAPI:
|
||
def __init__(self, auth: WXworkAuth, space_id):
|
||
self.space_id = space_id
|
||
self.auth = auth
|
||
self.access_token = self.auth.get_access_token()
|
||
|
||
def create_space(self, space_name):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_create?access_token={self.access_token}"
|
||
body = {
|
||
"space_name": space_name,
|
||
"auth_info": [{
|
||
"type": 1,
|
||
"userid": "lychang",
|
||
"auth": 7
|
||
}],
|
||
"space_sub_type": 0
|
||
}
|
||
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_space_info(self):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_info?access_token={self.access_token}"
|
||
body = {
|
||
"spaceid": self.space_id
|
||
}
|
||
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_space_file_list(self):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/file_list?access_token={self.access_token}"
|
||
body = {
|
||
"spaceid": self.space_id,
|
||
"fatherid": self.space_id,
|
||
"sort_type": 1,
|
||
"start": 0,
|
||
"limit": 100
|
||
}
|
||
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_document_info(self, doc_id):
|
||
self.doc_id = doc_id
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/document/get?access_token={self.access_token}"
|
||
body = {
|
||
"docid": self.doc_id
|
||
}
|
||
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_table_info(self, doc_id, sheet_id, view_ids: list):
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_views?access_token={self.access_token}"
|
||
body = {
|
||
"docid": doc_id,
|
||
"sheet_id": sheet_id,
|
||
"view_ids": view_ids,
|
||
"offset": 0,
|
||
"limit": 1
|
||
}
|
||
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_table_data(self, doc_id, sheet_id, view_id):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records?access_token={self.access_token}"
|
||
body = {
|
||
"docid": doc_id,
|
||
"sheet_id": sheet_id,
|
||
"view_id": view_id,
|
||
"record_ids": [],
|
||
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
|
||
"field_titles": [],
|
||
"field_ids": [],
|
||
"sort": [],
|
||
"offset": 0,
|
||
"limit": 100
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def get_sheet_data(self, doc_id):
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet?access_token={self.access_token}"
|
||
body = {
|
||
"docid": doc_id
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def create_table(self, doc_name, user_ids: list, father_id=None):
|
||
"""
|
||
doc_type 文档类型, 3:文档 4:表格 10:智能表格
|
||
"""
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/create_doc?access_token={self.access_token}"
|
||
body = {
|
||
"doc_type": 10,
|
||
"doc_name": doc_name,
|
||
"admin_users": user_ids
|
||
}
|
||
if self.space_id:
|
||
body["spaceid"] = self.space_id
|
||
if father_id:
|
||
body["fatherid"] = father_id
|
||
else:
|
||
body["fatherid"] = self.space_id
|
||
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
def add_row(self, doc_id, sheet_id, records: list):
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_records?access_token={self.access_token}"
|
||
body = {
|
||
"docid": doc_id,
|
||
"sheet_id": sheet_id,
|
||
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
|
||
"records": records
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code != 200:
|
||
return {}
|
||
else:
|
||
approval_info = response.json()
|
||
return approval_info
|
||
|
||
|
||
def parse_data(data: dict, mapping: dict) -> dict:
|
||
user_id = data.get("applyer").get("userid")
|
||
app_data = data.get("apply_data").get("contents")
|
||
_info = {}
|
||
for i in app_data:
|
||
key = i.get("title")[0]["text"]
|
||
cp_id = i.get("id")
|
||
control = i.get("control")
|
||
value = i.get("value")
|
||
if control == "Text":
|
||
result = value.get("text")
|
||
elif control == "Number":
|
||
result = value.get("new_number")
|
||
elif control == "Date":
|
||
result = value.get("date").get("s_timestamp")
|
||
elif control == "Selector":
|
||
_t = value.get("selector").get("type")
|
||
if _t == "multi":
|
||
opts = value.get("selector").get("options", [])
|
||
result = [j["key"] for j in opts]
|
||
else:
|
||
opts = i.get("selector").get("options", [])
|
||
result = opts[0]["key"]
|
||
else:
|
||
result = ""
|
||
_info[cp_id] = {"key": key, "value": result}
|
||
_r = {v: _info.get(k, {}).get("value") for k, v in mapping.items()}
|
||
_r["_uid"] = user_id
|
||
return _r
|
||
|
||
|
||
crop_id = "wx98dfa35ad7e4b271"
|
||
space_id = "s.wx98dfa35ad7e4b271.744621152iSk"
|
||
f_id = 'dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw'
|
||
wxc_app_secret = "gyN5moBa6Ev1Vd0ZLUVrsEtAO_goppWKUBdsnSng-Ks"
|
||
wapi_app_secret = "oM0hwIi8GRPw6HWk9o5__g3v5ziz5CGyBUo2FASSrVw"
|
||
wxc_auth = WXworkAuth(crop_id, wxc_app_secret)
|
||
wapi_auth = WXworkAuth(crop_id, wapi_app_secret)
|
||
cf_api = CloudFunctionAPI("http://10.0.0.39/cloud")
|
||
wxc_api = WXworkCalendarAPI(wxc_auth)
|
||
wapi = WXworkApprovalAPI(wapi_auth)
|
||
wup = WXworkUserInfoAPI(wapi_auth)
|
||
ssp = WXworkDocAPI(wxc_auth, space_id)
|
||
|
||
|
||
def file_exists(file_path):
|
||
_url = f"https://a.cmdp.cn/basiceg/v1/csp/exist/{file_path}"
|
||
response = requests.get(_url.format(file_path=file_path))
|
||
return response.json().get("exist", False)
|
||
|
||
|
||
class CalendarDB:
|
||
def __init__(self):
|
||
self.name = None
|
||
self.user_id = None
|
||
self.user_info = None
|
||
self.calendar_id = None
|
||
self.calendar_connection = None
|
||
self.cloud_connection = None
|
||
|
||
def login(self, login_user: str):
|
||
self.name = login_user
|
||
self.calendar_connection = wxc_api
|
||
self.cloud_connection = cf_api
|
||
user_info = self.cloud_connection.get_user_info_by_name(user_name=self.name)
|
||
self.user_id = user_info.get('id')
|
||
user_tags = self.cloud_connection.parse_tags(user_info.get('tags', []))
|
||
calendar_ids = [i for i in user_tags.get('calendar', {}).get('id', {})]
|
||
self.calendar_id = calendar_ids[0] if calendar_ids else None
|
||
if not self.calendar_id:
|
||
self.calendar_id = self.create_calendar("访问日历")
|
||
if self.calendar_id:
|
||
self.cloud_connection.update_user_tag(user_id=self.user_id, key="calendar_id", value=self.calendar_id)
|
||
return {"calendar": self.calendar_id}
|
||
|
||
def create_calendar(self, calendar_name: str):
|
||
return self.calendar_connection.create_calendar(calendar_name, self.name)
|
||
|
||
@abstractmethod
|
||
def _transfer_calendar_data(self, data: dict) -> dict:
|
||
pass
|
||
|
||
def get_all(self):
|
||
return self.calendar_connection.get_calendar_list(self.calendar_id)
|
||
|
||
def add(self, data: dict):
|
||
_data = data.copy()
|
||
data.pop("_uid")
|
||
schedule_name = f"{data.get('company_name')}[{data.get('company_code')}] 拜访计划"
|
||
start_time = data.get("visit_time")
|
||
end_time = data.get("end_time")
|
||
content = json.dumps(data, ensure_ascii=False)
|
||
address = data.get("company_address")
|
||
self.calendar_connection.create_schedule(self.calendar_id, schedule_name, start_time, end_time, self.name,
|
||
content,
|
||
address)
|
||
return self.trigger(_data)
|
||
|
||
@abstractmethod
|
||
def trigger(self, data: dict):
|
||
pass
|
||
|
||
def delete(self, schedule_id: str):
|
||
self.trigger({})
|
||
pass
|
||
|
||
|
||
def get_report_ticket(file_path: str, download_name: str):
|
||
"""
|
||
四.生成文件下载地址生成【通用】
|
||
说明
|
||
调用接口获取ticket,然后拼接地址后发给用户
|
||
接口地址
|
||
https://a.cmdp.cn/v1/cloudfile/file/createTicket
|
||
输入参数[post]
|
||
{
|
||
"appId": "",
|
||
"uri": "s3://usercentre03-dev/user/documents/my document/00002rfdata11792529c9f6e136e282b6416ff20d3f.json", //文件的s3地址
|
||
"startTime": "2024-03-21 12:12:12",
|
||
"endTime": "",
|
||
"maxTimes": 0,
|
||
"uid": "",
|
||
"name": "1.json", //下载时的文件名
|
||
"isPublic":false
|
||
}
|
||
|
||
返回示例:
|
||
{
|
||
"code": 200,
|
||
"msg": null,
|
||
"data": "LCXNDEwla",
|
||
"validations": null,
|
||
"token": null,
|
||
"success": true
|
||
}
|
||
拼接文件下载地址
|
||
https://ea.cmdp.cn/v1/cloudfile/file/?ticket=LCXNDEwla
|
||
|
||
"""
|
||
url = "https://a.cmdp.cn/v1/cloudfile/file/createTicket"
|
||
body = {
|
||
"appId": "",
|
||
"uri": file_path,
|
||
# 文件的s3地址
|
||
"startTime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
|
||
"endTime": "",
|
||
"maxTimes": 0,
|
||
"uid": "",
|
||
"name": download_name, # 下载时的文件名
|
||
"isPublic": False
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
response = requests.post(url, json=body, headers=headers)
|
||
ticket = response.json().get("data")
|
||
download_url = f"https://a.cmdp.cn/v1/cloudfile/file/?ticket={ticket}"
|
||
return download_url
|
||
|
||
|
||
class SalesTools:
|
||
|
||
@staticmethod
|
||
def get_user_info(security_code: str):
|
||
"""
|
||
二.用户列表查询
|
||
说明
|
||
【同步调用】
|
||
接口地址
|
||
http://10.0.0.39/cloud/in/api/userQuery
|
||
|
||
输入参数[post]
|
||
{
|
||
"secCode": "000002",
|
||
"status":10,
|
||
"PageSize": 10,
|
||
"PageNo": 1
|
||
}"""
|
||
return cf_api.get_user_info_by_sec(security_code)
|
||
|
||
@staticmethod
|
||
def update_user_tag(user_id: str):
|
||
"""
|
||
三.用户打标签
|
||
说明
|
||
【同步调用】
|
||
接口地址
|
||
|
||
http://10.0.0.39/cloud/in/api/user
|
||
输入参数[post]
|
||
{
|
||
"id": "1722805021193383937",
|
||
"tags": [
|
||
"客户类型_上市公司",
|
||
"客户权限_动态令牌"
|
||
]
|
||
}
|
||
"""
|
||
cf_api.update_user_tags(user_id, {"客户类型": "上市公司", "客户权限": "动态令牌"})
|
||
|
||
@staticmethod
|
||
def get_org_info(security_code: str):
|
||
"""
|
||
四.通用机构信息查询
|
||
说明
|
||
接口地址
|
||
http://ea.cmdp.cn/apm/v1/ability/api000353
|
||
|
||
输入参数[post]
|
||
{
|
||
"_data": {
|
||
"searchType": "stock",
|
||
"secCode": "000002" //证券代码
|
||
}
|
||
}
|
||
"""
|
||
url = "http://a.cmdp.cn/apm/v1/ability/api000353"
|
||
body = {
|
||
"_data": {
|
||
"searchType": "stock",
|
||
"secCode": security_code
|
||
}
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
response = requests.post(url, json=body, headers=headers)
|
||
return response.json().get("data", {})
|
||
|
||
@staticmethod
|
||
def get_report_rule(tags: list[str]):
|
||
"""
|
||
|
||
四.尽调报告的规则文件地址查询
|
||
说明
|
||
接口地址
|
||
https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release
|
||
|
||
输入参数[post]
|
||
{
|
||
"opUserId": "genesys_app",
|
||
"sourceUserId": "genesys_app",
|
||
"filter":"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '尽调报告云盘项目名称,需要问@冯昕宇'",【企业售前画像报告】
|
||
"tags":["报告年度_2023","biz_项目主体类别_单体","biz_业务场所_深交所"]
|
||
}
|
||
"""
|
||
url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release"
|
||
body = {
|
||
"opUserId": "genesys_app",
|
||
"sourceUserId": "genesys_app",
|
||
"filter": f"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '企业售前画像报告'",
|
||
"tags": tags
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
response = requests.post(url, json=body, headers=headers)
|
||
return response.json()
|
||
|
||
@staticmethod
|
||
def get_report_info(user_id: str, security_code: str, rule_path: str):
|
||
"""
|
||
四.尽调报告执行
|
||
说明
|
||
执行后报告需要生成ticket的下载地址
|
||
接口地址
|
||
https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile
|
||
输入参数[post]
|
||
{
|
||
"issync": 2,
|
||
"ruleobj": "temp-003/rule.json",
|
||
"dataobj": {
|
||
"uid": "用户ID", //填入真实用户ID,执行后会在微信公众收到结果消息,可以直接打开报告
|
||
"_data": {
|
||
"secCode": "000002"
|
||
},
|
||
"_var": {
|
||
"config": {
|
||
"prodName": "企业售前画像报告"
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
"""
|
||
url = "https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile"
|
||
body = {
|
||
"issync": 2, # 同步执行
|
||
"ruleobj": rule_path,
|
||
"dataobj": {
|
||
"uid": user_id,
|
||
"_data": {
|
||
"secCode": security_code
|
||
},
|
||
"_var": {
|
||
"config": {
|
||
"prodName": "企业售前画像报告"
|
||
}
|
||
}
|
||
}}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
response = requests.post(url, json=body, headers=headers)
|
||
return response.json().get("dumpmsg", {}).get("_out", {})
|
||
|
||
@staticmethod
|
||
def get_regression_testing(eid: str):
|
||
|
||
"""
|
||
执行后报告需要生成ticket的下载地址:2024年半年报
|
||
@卢天宝说明一下文件存储路径,@常连钰直接根据文件存储路径生成下载ticket,但是需要判断是否存在:https://a.cmdp.cn/basiceg/v1/csp/exist/{path}
|
||
存储路径:announce02/{textId}._val_01.json , announce02/{textId}._val_01.docx
|
||
textId获取:
|
||
请求地址(post):http://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping
|
||
参数 :
|
||
{
|
||
"tags": [
|
||
"报告期_2024QR1", //替换为真实报告期
|
||
"GE8002005E_100000000002334460" //GE8002005E_{eid}
|
||
]
|
||
}
|
||
返回示例:
|
||
{
|
||
"total": 1,
|
||
"records": [
|
||
{
|
||
"eid": "100000000037315576",
|
||
"tags": {
|
||
"公司名称_上海凌云实业发展股份有限公司": true,
|
||
"GE8002005E_100000000002334460": true,
|
||
"证券代码_900957": true,
|
||
"报告期_2024QR1": true,
|
||
"TEXTID_1219815286": true, //其中1219815286就是text的值
|
||
"公告标题_凌云B股:凌云B股2024年第一季度报告": true
|
||
}
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
|
||
def get_text_id(report_date, entity_id):
|
||
url = "https://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping"
|
||
body = {
|
||
"tags": [
|
||
f"报告期_{report_date}",
|
||
f"GE8002005E_{entity_id}"
|
||
]
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
response = requests.post(url, json=body, headers=headers)
|
||
result = response.json()
|
||
tags_ = result.get("records", [{}])
|
||
if tags_:
|
||
tags = tags_[0].get("tags", [{}])
|
||
else:
|
||
tags = []
|
||
|
||
text_id = None
|
||
for tag in tags:
|
||
l = tag.split("_")
|
||
if l[0] == "TEXTID":
|
||
text_id = l[1]
|
||
return text_id
|
||
|
||
def get_report_datetime():
|
||
report_datetime = time.localtime(time.time())
|
||
year = report_datetime.tm_year
|
||
month = report_datetime.tm_mon
|
||
if month >= 9:
|
||
return [f"{year}SAR", f"{year - 1}AR"]
|
||
elif month >= 4:
|
||
return [f"{year - 1}SAR", f"{year - 1}AR"]
|
||
else:
|
||
return [f"{year - 1}SAR", f"{year - 2}AR"]
|
||
|
||
def get_report(tid):
|
||
if not tid:
|
||
return None
|
||
docx_path = f"announce02/{tid}_val_01.docx"
|
||
json_path = f"announce02/{tid}_val_01.json"
|
||
if file_exists(json_path) and file_exists(docx_path):
|
||
return flag, get_report_ticket("s3://" + docx_path, "回归测试报告.docx")
|
||
else:
|
||
print("file not exist")
|
||
return None, None
|
||
|
||
flags = get_report_datetime()
|
||
result = ""
|
||
for flag in flags:
|
||
text_id = get_text_id(flag, eid)
|
||
f, r = get_report(text_id)
|
||
if r:
|
||
result += f'<br><p color="black">{f}:</p><a href="{r}" target="_blank" style="color: rgb(38, 126, 240); text-decoration-line: underline; outline-style: none; cursor: pointer; font-size: 16px; font-weight: 400;">{r}</a>'
|
||
if result != "":
|
||
return result
|
||
else:
|
||
return "暂无"
|
||
|
||
@staticmethod
|
||
def get_current_product_id():
|
||
now = time.localtime(time.time())
|
||
year = now.tm_year
|
||
month = now.tm_mon
|
||
if month >= 5:
|
||
year = year - 1
|
||
else:
|
||
year = year - 2
|
||
url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_listall/usys/products/release"
|
||
body = {
|
||
"opUserId": "genesys_app",
|
||
"sourceUserId": "genesys_app",
|
||
"filter": "'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.resource.label' eq '小智智填-定期报告专版'",
|
||
"tags": ["tech_sbom", f"prod_year_{year}", "type_入口", "prod_小智转写"]
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code == 200:
|
||
response_json = response.json()
|
||
code = response_json.get("code", -1)
|
||
if code == 200:
|
||
data = response_json.get("data", {})
|
||
records = data.get("records", [])
|
||
if records:
|
||
widget_info = records[0].get("fileInfo", {}).get("extends", {}).get("widget", {})
|
||
product_id = widget_info.get("id")
|
||
return product_id
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_smart_genie_product(entity_id: str, product_id: str):
|
||
"""
|
||
说明
|
||
选样本后的订购【打印或复制】小智类产品
|
||
内部接口,eid和个人uid是输入的。
|
||
产品选择是输入产品id:
|
||
产品 产品id 说明
|
||
小智复核-定期报告专版 已有产品,示例
|
||
小智智填-定期报告专版 1854436156570607618 产品id待补充
|
||
|
||
接口地址
|
||
https://cloud.cmdp.cn/api/product/deliver/print
|
||
|
||
输入参数[post]
|
||
{
|
||
"eid": "eid,不填就是查询用户所属公司的",
|
||
"pid": "必填,入口产品id",
|
||
"priceType":"产品版本,枚举:体验、标准版、旗舰版、特别优惠版";售前产品固定:标准版
|
||
}
|
||
输入示例
|
||
{
|
||
"eid": "100000000002334059",
|
||
"pid": "1858326969193848833",
|
||
"priceType":"标准版"
|
||
}
|
||
返回参数
|
||
{"Code":"状态码","data":"入口产品id"}
|
||
返回示例
|
||
|
||
{"Code":200,"data":"1853245209740582914"}
|
||
打印成功后会有公众号通知
|
||
|
||
|
||
"""
|
||
|
||
url = "https://cloud.cmdp.cn/api/product/deliver/print"
|
||
body = {
|
||
"eid": entity_id,
|
||
"pid": product_id if product_id else "",
|
||
"priceType": "体验版"
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code == 200:
|
||
response_json = response.json()
|
||
code = response_json.get("code", -1)
|
||
if code == 200:
|
||
return response_json.get("data", "")
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_product_qrcode(entity_id: str, product_id: str):
|
||
"""
|
||
接口示例:
|
||
标签查fid:
|
||
curl --location --request POST "http://10.0.0.39/octopus/usr/genesys_app/file/query" ^
|
||
--header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^
|
||
--header "Content-Type: application/json" ^
|
||
--header "Accept: */*" ^
|
||
--header "Host: 10.0.0.39" ^
|
||
--header "Connection: keep-alive" ^
|
||
--data-raw "{ \"Category\": \"default\", \"ownerID\":\"genesys_app\", \"query\": \"\\\"uid_1652264045856165888\\\"\", \"path\":\"/system/users\", \"includeHistory\":false}"
|
||
|
||
fid查文件:
|
||
curl --location --request GET "http://10.0.0.39/octopus/usr/genesys_app/file/1-23823/meta" ^
|
||
--header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^
|
||
--header "Accept: */*" ^
|
||
--header "Host: 10.0.0.39" ^
|
||
--header "Connection: keep-alive"
|
||
|
||
query查询标签:
|
||
查询标签 【"eid_{eid}","pid_{pid}","priceType_标准版"】
|
||
|
||
|
||
"""
|
||
|
||
def get_file_by_fid(file_id):
|
||
url = f"http://10.0.0.39/octopus/usr/genesys_app/file/{file_id}/meta"
|
||
|
||
try:
|
||
response = requests.get(url)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
return {}
|
||
except Exception:
|
||
return {}
|
||
|
||
def get_file_id(entity_id: str, product_id: str):
|
||
url = "http://10.0.0.39/octopus/usr/genesys_app/file/query"
|
||
body = {
|
||
"Category": "default",
|
||
"ownerID": "genesys_app",
|
||
"query": f"\"eid_{entity_id}\" & \"pid_{product_id}\" & \"priceType_标准版\"",
|
||
"path": "/system/users",
|
||
"includeHistory": False
|
||
}
|
||
response = requests.post(url, json=body)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
return []
|
||
|
||
if not product_id:
|
||
return None
|
||
file_ids = get_file_id(entity_id, product_id)
|
||
|
||
if not file_ids:
|
||
return None
|
||
file_info = get_file_by_fid(file_ids[0])
|
||
|
||
return file_info.get("uri")
|
||
|
||
@staticmethod
|
||
def send_company_mail(to_address: str, subject: str, content: str):
|
||
url = "https://a.cmdp.cn/apm/v1/ability/api000357"
|
||
body = {"_data": {
|
||
"type": "html",
|
||
"tos": [to_address],
|
||
"subject": subject,
|
||
"text": content
|
||
}
|
||
}
|
||
response = requests.post(url, json=body)
|
||
return response.json()
|
||
|
||
@staticmethod
|
||
def get_qrcode_base64(product_qrcode_path):
|
||
if not product_qrcode_path:
|
||
return ""
|
||
response = requests.get(product_qrcode_path)
|
||
if response.status_code == 200:
|
||
return base64.b64encode(response.content).decode('utf-8')
|
||
else:
|
||
return ""
|
||
|
||
@staticmethod
|
||
def get_information_disclosure_picture(year: str, security_code: str):
|
||
basiceg_url = "http://a.cmdp.cn/basiceg/v1/bytes"
|
||
file_path = f"/usercentre03-dev/user/cache/{year}/信披成绩单/{security_code}_idsr.jpg"
|
||
static_path = f"/product-data/static/xpcjd/{security_code}_idsr.jpg"
|
||
if file_exists(file_path):
|
||
resp = requests.get(basiceg_url + file_path)
|
||
if resp.status_code == 200:
|
||
data = resp.content
|
||
requests.put(basiceg_url + static_path, data=data)
|
||
return f'<img src="http://www.cmdp.cn/a/s/xpcjd/{security_code}_idsr.jpg" width="100%">'
|
||
else:
|
||
return ""
|
||
|
||
|
||
st = SalesTools()
|
||
|
||
|
||
class CalendarEventDB(CalendarDB):
|
||
|
||
def _transfer_calendar_data(self, data: dict) -> dict:
|
||
pass
|
||
|
||
def trigger(self, data: dict):
|
||
security_code = data.get("company_code")
|
||
user_list = st.get_user_info(security_code)
|
||
# 获取公司信息
|
||
company_info = st.get_org_info(security_code)
|
||
eid = company_info.get("eid")
|
||
full_name = company_info.get("fullName")
|
||
# 打标签
|
||
if user_list:
|
||
for user in user_list:
|
||
user_id = user.get("id")
|
||
st.update_user_tag(user_id)
|
||
# 获取bp id
|
||
bp_info = self.cloud_connection.get_user_info_by_name(self.name)
|
||
bp_id = bp_info.get("id")
|
||
bp_mail = bp_info.get("email")
|
||
|
||
# 获取报告文件
|
||
now = time.localtime(time.time())
|
||
year = now.tm_year
|
||
month = now.tm_mon
|
||
if month >= 5:
|
||
year = year - 1
|
||
else:
|
||
year = year - 2
|
||
report_tags = [f"报告年度_{year}", "biz_项目主体类别_单体",
|
||
"biz_业务场所_深交所"]
|
||
report_rule_info = st.get_report_rule(report_tags)
|
||
report_rule_path = report_rule_info.get("data", {}).get("records", "")
|
||
if report_rule_path:
|
||
report_rule_path = report_rule_path[0].get("filePath", "")
|
||
report_info = st.get_report_info(bp_id, security_code, report_rule_path)
|
||
report_ticket = report_info.get("data").get("path")
|
||
regression_testing = st.get_regression_testing(eid)
|
||
with open("template.html", "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
content = content.replace("{{report_ticket}}", report_ticket)
|
||
content = content.replace("{{regression_testing}}", regression_testing)
|
||
# content = content.replace("{{qrcode}}", qrcode)
|
||
content = content.replace("{{picture}}", st.get_information_disclosure_picture("2024", security_code))
|
||
st.send_company_mail(bp_mail, f"客户拜访准备文件-{full_name}[{security_code}]", content)
|
||
ssp.add_row(doc_id=f_id, sheet_id="tTMuVB", records=[
|
||
{
|
||
"values": {
|
||
"证券代码": [
|
||
{
|
||
"type": "text",
|
||
"text": data["company_code"]
|
||
}
|
||
],
|
||
"证券简称": [{
|
||
"type": "text",
|
||
"text": data["company_name"]
|
||
}]
|
||
,
|
||
"地点": [{
|
||
"type": "text",
|
||
"text": data["company_address"]
|
||
}]
|
||
,
|
||
"拜访时间": data["visit_time"] + "000"
|
||
,
|
||
"结束时间": data["end_time"] + "000",
|
||
"拜访人": [{
|
||
"type": "text",
|
||
"text": data["_uid"]
|
||
}]
|
||
|
||
}
|
||
}])
|
||
|
||
|
||
ceb = CalendarEventDB()
|
||
|
||
|
||
def get_approval(template_id):
|
||
approval_list = wapi.get_approval_list_weekly(template_id)
|
||
info = {}
|
||
for approval_id in approval_list:
|
||
approval_info = wapi.get_approval_info(approval_id)
|
||
approval_info = parse_data(approval_info["info"],
|
||
{"Number-1697103952657": "company_code", "Text-1697103919621": "company_name",
|
||
"Text-1728374569134": "company_address", "Date-1728370374837": "visit_time",
|
||
"Date-1728370485478": "end_time"})
|
||
_uid = approval_info["_uid"]
|
||
if _uid not in info:
|
||
_u = wup.get_user_info(_uid)
|
||
info[_uid] = {"name": _u["name"]}
|
||
else:
|
||
_p = info[_uid]
|
||
company_code = approval_info["company_code"]
|
||
if company_code not in _p:
|
||
_p[company_code] = 1
|
||
else:
|
||
_p[company_code] += 1
|
||
return info
|
||
|
||
|
||
def generate_approval_report_html(times_report):
|
||
htms = "<!DOCTYPE html><html lang='en'><head><meta charset='UTF-8'><title></title><style>table{width:100%;border-collapse:collapse;font-family:Arial,sans-serif;box-shadow:0 2px 10px rgba(0,0,0,0.1)}caption{font-size:2em;font-weight:bold;margin:1em 0}th,td{border:1px solid#ccc;text-align:center;padding:15px;transition:background-color 0.3s}thead tr{background-color:#007BFF;color:#fff}tbody tr:nth-child(odd){background-color:#f9f9f9}tbody tr:hover{background-color:#e2e6ea}tfoot tr td{text-align:right;padding-right:20px}th{border-bottom:2px solid#007BFF}</style> </head><body><table border='1'><tr><th>姓名</th><th>公司代码</th><th>次数</th></tr>"
|
||
htme = "</table></body></html>"
|
||
content = ""
|
||
for key in times_report:
|
||
key_report = times_report[key]
|
||
name = key_report["name"]
|
||
key_report_list = [(i, key_report[i]) for i in key_report if i != "name"]
|
||
size = len(key_report_list)
|
||
for idx, key_info in enumerate(key_report_list):
|
||
if idx == 0:
|
||
content += f"<tr><td rowspan='{size}'>{name}</td><td>{key_info[0]}</td><td>{key_info[1]}</td></tr>"
|
||
else:
|
||
content += f"<tr><td>{key_info[0]}</td><td>{key_info[1]}</td></tr>"
|
||
return htms + content + htme
|
||
|
||
|
||
def new_product_print(info):
|
||
url = "https://cloud.cmdp.cn/api/product/print/customize"
|
||
body = {"userId": info["user_id"], "secCode": info["company_code"], "prodName": info["service_name"],
|
||
"opUserId": info["_uid"]}
|
||
r = requests.post(url, json=body)
|
||
return r.json()
|