# !/usr/bin/env python # -*- coding:utf-8 -*- import base64 import json import time from abc import abstractmethod import requests class WXworkAuth: def __init__(self, app_id, app_key): self.crop_id = app_id self.crop_secret = app_key self.access_token = None self.timestamp = None def _get_access_token(self): self.timestamp = int(time.time()) url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.crop_id}&corpsecret={self.crop_secret}" response = requests.get(url) access_info = response.json() if 'access_token' in access_info: self.access_token = access_info['access_token'] else: raise Exception('获取access_token失败') def get_access_token(self): _now = int(time.time()) if not self.access_token or (_now - self.timestamp) > 7200: self.refresh_token() return self.access_token def refresh_token(self): try: self._get_access_token() except Exception as e: print(e, '重新获取') self._get_access_token() class CloudFunctionAPI: def __init__(self, base_url): self.base_url = base_url def get_user_info_by_id(self, user_id: str) -> dict: url = f"{self.base_url}/in/api/user" body = { "uid": user_id } response = requests.get(url, params=body) if response.status_code != 200: return {} user_info = response.json().get('data', {}) return user_info def get_user_info_by_sec(self, security_code: str) -> list: url = f"{self.base_url}/in/api/userQuery" body = { "secCode": security_code, "status": 10, "PageSize": 10, "PageNo": 1 } response = requests.post(url, json=body) if response.status_code != 200: return [] user_info = response.json().get("list") if user_info is None: user_info = [{}] return user_info def get_user_info_by_name(self, user_name: str) -> dict: url = f"{self.base_url}/in/api/userQuery" body = { "loginName": user_name, "status": 10, "PageSize": 10, "PageNo": 1 } response = requests.post(url, json=body) if response.status_code != 200: return {} user_info = response.json().get("list") if user_info is None: user_info = {} else: user_info = user_info[0] return user_info @staticmethod def parse_tags(tags: list): def _parse_tag(_tag: str): return _tag.split("_") temp_dict = {} for tag in tags: tag_split = _parse_tag(tag) size = len(tag_split) cursor = None for idx, i in enumerate(tag_split): if idx == 0: cursor = temp_dict val = {} if idx < size - 1 else True cursor[i] = val if isinstance(val, bool): cursor = None else: cursor = cursor[i] print(temp_dict) return temp_dict def get_user_tags(self, user_id: str = "", user_name: str = "") -> dict: if user_id: user_info = self.get_user_info_by_id(user_id) elif user_name: user_info = self.get_user_info_by_name(user_name) else: return {} tags = user_info.get('tags', []) return self.parse_tags(tags) def update_user_tag(self, user_id: str, key: str, value: str): url = f"{self.base_url}/in/api/userTags" body = { "userId": user_id, "tags": [f"{key}_{value}"] } response = requests.post(url, json=body) if response.status_code != 200: return {} user_info = response.json() return user_info def update_user_tag_by_name(self, user_name: str, key: str, value: str): user_info = self.get_user_info_by_name(user_name) user_id = user_info.get('id') if not user_id: return {} return self.update_user_tag(user_id, key, value) def update_user_tags(self, user_id: str, tags: dict): url = f"{self.base_url}/in/api/userTags" body = { "userId": user_id, "tags": [f"{key}_{value}" for key, value in tags.items()] } response = requests.post(url, json=body) if response.status_code != 200: return {} user_info = response.json() return user_info class WXworkBaseAPI: def __init__(self, auth: WXworkAuth): self.auth = auth @property def access_token(self): return self.auth.get_access_token() class WXworkApprovalAPI(WXworkBaseAPI): def get_approval_list(self, template_id: str, start_time: int, end_time: int) -> list: url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token={self.access_token}" body = { "starttime": str(start_time), "endtime": str(end_time), "new_cursor": "", "size": 100, "filters": [ { "key": "template_id", "value": template_id }, { "key": "sp_status", "value": "2" } ] } response = requests.post(url, json=body) if response.status_code != 200: return [] else: approval_info = response.json() return approval_info["sp_no_list"] def get_approval_info(self, approval_id: str) -> dict: url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token={self.access_token}" body = { "sp_no": approval_id } response = requests.post(url, json=body) approval_info = response.json() return approval_info def get_approval_list_weekly(self, template_id) -> list: current_time = time.time() # 获取当前时间的结构体 current_local_time = time.localtime(current_time) # 星期几(0=星期一,6=星期天) weekday = current_local_time.tm_wday # 计算下一个星期天的天数 days_until_sunday = (6 - weekday) % 7 this_weekday = int(current_time + (days_until_sunday * 86400)) last_weekday = int(this_weekday - 7 * 24 * 3600) return self.get_approval_list(template_id, last_weekday, this_weekday) class WXworkCalendarAPI(WXworkBaseAPI): def create_calendar(self, calendar_name, userid): url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/add?access_token={self.access_token}" body = { "calendar": { "admins": [ userid ], "set_as_default": 1, "summary": calendar_name, "color": "#FF3030", "description": "访问日历", "is_public": 0, "is_corp_calendar": 0, "shares": [ { "userid": userid, "permission": 1 }, ], } } response = requests.post(url, json=body) cinfo = response.json() if "cal_id" in cinfo: return cinfo["cal_id"] else: return cinfo def delete_calendar(self, calendar_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/del?access_token={self.access_token}" body = { "cal_id": calendar_id } response = requests.post(url, json=body) calendar_info = response.json() return calendar_info def get_calendar_list(self, calendar_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get_by_calendar?access_token={self.access_token}" body = { "cal_id": calendar_id, "offset": 0, "limit": 1000 } response = requests.post(url, json=body) calendar_list = response.json() return calendar_list def create_schedule(self, calendar_id, schedule_name, start_time, end_time, user_id, content, address=""): url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/add?access_token={self.access_token}" if start_time == end_time: end_time = str(int(start_time) + 1) body = { "schedule": { "admins": [ user_id ], "start_time": start_time, "end_time": end_time, "is_whole_day": 0, "attendees": [{ "userid": user_id }], "summary": schedule_name, "description": content, "reminders": { "is_remind": 1, "remind_before_event_secs": 3600, "timezone": 8 }, "location": address, "cal_id": calendar_id } } response = requests.post(url, json=body) schedule_info = response.json() return schedule_info def get_schedule(self, schedule_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get?access_token={self.access_token}" body = { "schedule_id_list": [ schedule_id ] } response = requests.post(url, json=body) schedule_list = response.json() return schedule_list def delete_schedule(self, schedule_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/del?access_token={self.access_token}" body = { "schedule_id": schedule_id } response = requests.post(url, json=body) schedule_info = response.json() return schedule_info class WXworkUserInfoAPI(WXworkBaseAPI): def get_user_info(self, user_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={self.access_token}&userid={user_id}" response = requests.get(url) user_info = response.json() return user_info class WXworkDocAPI: def __init__(self, auth: WXworkAuth, space_id): self.space_id = space_id self.auth = auth self.access_token = self.auth.get_access_token() def create_space(self, space_name): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_create?access_token={self.access_token}" body = { "space_name": space_name, "auth_info": [{ "type": 1, "userid": "lychang", "auth": 7 }], "space_sub_type": 0 } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def get_space_info(self): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_info?access_token={self.access_token}" body = { "spaceid": self.space_id } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def get_space_file_list(self): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/file_list?access_token={self.access_token}" body = { "spaceid": self.space_id, "fatherid": self.space_id, "sort_type": 1, "start": 0, "limit": 100 } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def get_document_info(self, doc_id): self.doc_id = doc_id url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/document/get?access_token={self.access_token}" body = { "docid": self.doc_id } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def get_table_info(self, doc_id, sheet_id, view_ids: list): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_views?access_token={self.access_token}" body = { "docid": doc_id, "sheet_id": sheet_id, "view_ids": view_ids, "offset": 0, "limit": 1 } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def get_table_data(self, doc_id, sheet_id, view_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records?access_token={self.access_token}" body = { "docid": doc_id, "sheet_id": sheet_id, "view_id": view_id, "record_ids": [], "key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE", "field_titles": [], "field_ids": [], "sort": [], "offset": 0, "limit": 100 } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def get_sheet_data(self, doc_id): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet?access_token={self.access_token}" body = { "docid": doc_id } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def create_table(self, doc_name, user_ids: list, father_id=None): """ doc_type 文档类型, 3:文档 4:表格 10:智能表格 """ url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/create_doc?access_token={self.access_token}" body = { "doc_type": 10, "doc_name": doc_name, "admin_users": user_ids } if self.space_id: body["spaceid"] = self.space_id if father_id: body["fatherid"] = father_id else: body["fatherid"] = self.space_id response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def add_row(self, doc_id, sheet_id, records: list): url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_records?access_token={self.access_token}" body = { "docid": doc_id, "sheet_id": sheet_id, "key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE", "records": records } response = requests.post(url, json=body) if response.status_code != 200: return {} else: approval_info = response.json() return approval_info def parse_data(data: dict, mapping: dict) -> dict: user_id = data.get("applyer").get("userid") app_data = data.get("apply_data").get("contents") _info = {} for i in app_data: key = i.get("title")[0]["text"] cp_id = i.get("id") control = i.get("control") value = i.get("value") if control == "Text": result = value.get("text") elif control == "Number": result = value.get("new_number") elif control == "Date": result = value.get("date").get("s_timestamp") elif control == "Selector": _t = value.get("selector").get("type") if _t == "multi": opts = value.get("selector").get("options", []) result = [j["key"] for j in opts] else: opts = i.get("selector").get("options", []) result = opts[0]["key"] else: result = "" _info[cp_id] = {"key": key, "value": result} _r = {v: _info.get(k, {}).get("value") for k, v in mapping.items()} _r["_uid"] = user_id return _r crop_id = "wx98dfa35ad7e4b271" space_id = "s.wx98dfa35ad7e4b271.744621152iSk" f_id = 'dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw' wxc_app_secret = "gyN5moBa6Ev1Vd0ZLUVrsEtAO_goppWKUBdsnSng-Ks" wapi_app_secret = "oM0hwIi8GRPw6HWk9o5__g3v5ziz5CGyBUo2FASSrVw" wxc_auth = WXworkAuth(crop_id, wxc_app_secret) wapi_auth = WXworkAuth(crop_id, wapi_app_secret) cf_api = CloudFunctionAPI("http://10.0.0.39/cloud") wxc_api = WXworkCalendarAPI(wxc_auth) wapi = WXworkApprovalAPI(wapi_auth) wup = WXworkUserInfoAPI(wapi_auth) ssp = WXworkDocAPI(wxc_auth, space_id) def file_exists(file_path): _url = f"https://a.cmdp.cn/basiceg/v1/csp/exist/{file_path}" response = requests.get(_url.format(file_path=file_path)) return response.json().get("exist", False) class CalendarDB: def __init__(self): self.name = None self.user_id = None self.user_info = None self.calendar_id = None self.calendar_connection = None self.cloud_connection = None def login(self, login_user: str): self.name = login_user self.calendar_connection = wxc_api self.cloud_connection = cf_api user_info = self.cloud_connection.get_user_info_by_name(user_name=self.name) self.user_id = user_info.get('id') user_tags = self.cloud_connection.parse_tags(user_info.get('tags', [])) calendar_ids = [i for i in user_tags.get('calendar', {}).get('id', {})] self.calendar_id = calendar_ids[0] if calendar_ids else None if not self.calendar_id: self.calendar_id = self.create_calendar("访问日历") if self.calendar_id: self.cloud_connection.update_user_tag(user_id=self.user_id, key="calendar_id", value=self.calendar_id) return {"calendar": self.calendar_id} def create_calendar(self, calendar_name: str): return self.calendar_connection.create_calendar(calendar_name, self.name) @abstractmethod def _transfer_calendar_data(self, data: dict) -> dict: pass def get_all(self): return self.calendar_connection.get_calendar_list(self.calendar_id) def add(self, data: dict): _data = data.copy() data.pop("_uid") schedule_name = f"{data.get('company_name')}[{data.get('company_code')}] 拜访计划" start_time = data.get("visit_time") end_time = data.get("end_time") content = json.dumps(data, ensure_ascii=False) address = data.get("company_address") self.calendar_connection.create_schedule(self.calendar_id, schedule_name, start_time, end_time, self.name, content, address) return self.trigger(_data) @abstractmethod def trigger(self, data: dict): pass def delete(self, schedule_id: str): self.trigger({}) pass def get_report_ticket(file_path: str, download_name: str): """ 四.生成文件下载地址生成【通用】 说明 调用接口获取ticket,然后拼接地址后发给用户 接口地址 https://a.cmdp.cn/v1/cloudfile/file/createTicket 输入参数[post] { "appId": "", "uri": "s3://usercentre03-dev/user/documents/my document/00002rfdata11792529c9f6e136e282b6416ff20d3f.json", //文件的s3地址 "startTime": "2024-03-21 12:12:12", "endTime": "", "maxTimes": 0, "uid": "", "name": "1.json", //下载时的文件名 "isPublic":false } 返回示例: { "code": 200, "msg": null, "data": "LCXNDEwla", "validations": null, "token": null, "success": true } 拼接文件下载地址 https://ea.cmdp.cn/v1/cloudfile/file/?ticket=LCXNDEwla """ url = "https://a.cmdp.cn/v1/cloudfile/file/createTicket" body = { "appId": "", "uri": file_path, # 文件的s3地址 "startTime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "endTime": "", "maxTimes": 0, "uid": "", "name": download_name, # 下载时的文件名 "isPublic": False } headers = { "Content-Type": "application/json" } response = requests.post(url, json=body, headers=headers) ticket = response.json().get("data") download_url = f"https://a.cmdp.cn/v1/cloudfile/file/?ticket={ticket}" return download_url class SalesTools: @staticmethod def get_user_info(security_code: str): """ 二.用户列表查询 说明 【同步调用】 接口地址 http://10.0.0.39/cloud/in/api/userQuery 输入参数[post] { "secCode": "000002", "status":10, "PageSize": 10, "PageNo": 1 }""" return cf_api.get_user_info_by_sec(security_code) @staticmethod def update_user_tag(user_id: str): """ 三.用户打标签 说明 【同步调用】 接口地址 http://10.0.0.39/cloud/in/api/user 输入参数[post] { "id": "1722805021193383937", "tags": [ "客户类型_上市公司", "客户权限_动态令牌" ] } """ cf_api.update_user_tags(user_id, {"客户类型": "上市公司", "客户权限": "动态令牌"}) @staticmethod def get_org_info(security_code: str): """ 四.通用机构信息查询 说明 接口地址 http://ea.cmdp.cn/apm/v1/ability/api000353 输入参数[post] { "_data": { "searchType": "stock", "secCode": "000002" //证券代码 } } """ url = "http://a.cmdp.cn/apm/v1/ability/api000353" body = { "_data": { "searchType": "stock", "secCode": security_code } } headers = { "Content-Type": "application/json" } response = requests.post(url, json=body, headers=headers) return response.json().get("data", {}) @staticmethod def get_report_rule(tags: list): """ 四.尽调报告的规则文件地址查询 说明 接口地址 https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release 输入参数[post] { "opUserId": "genesys_app", "sourceUserId": "genesys_app", "filter":"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '尽调报告云盘项目名称,需要问@冯昕宇'",【企业售前画像报告】 "tags":["报告年度_2023","biz_项目主体类别_单体","biz_业务场所_深交所"] } """ url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_list/usys/pbcs/release" body = { "opUserId": "genesys_app", "sourceUserId": "genesys_app", "filter": f"'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.pbcMeta.name' eq '企业售前画像报告'", "tags": tags } headers = { "Content-Type": "application/json" } response = requests.post(url, json=body, headers=headers) return response.json() @staticmethod def get_report_info(user_id: str, security_code: str, rule_path: str): """ 四.尽调报告执行 说明 执行后报告需要生成ticket的下载地址 接口地址 https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile 输入参数[post] { "issync": 2, "ruleobj": "temp-003/rule.json", "dataobj": { "uid": "用户ID", //填入真实用户ID,执行后会在微信公众收到结果消息,可以直接打开报告 "_data": { "secCode": "000002" }, "_var": { "config": { "prodName": "企业售前画像报告" } } } } """ url = "https://a.cmdp.cn/v1/cmp/cal/fn/PerformRuleFile" body = { "issync": 2, # 同步执行 "ruleobj": rule_path, "dataobj": { "uid": user_id, "_data": { "secCode": security_code }, "_var": { "config": { "prodName": "企业售前画像报告" } } }} headers = { "Content-Type": "application/json" } response = requests.post(url, json=body, headers=headers) return response.json().get("dumpmsg", {}).get("_out", {}) @staticmethod def get_regression_testing(eid: str): """ 执行后报告需要生成ticket的下载地址:2024年半年报 @卢天宝说明一下文件存储路径,@常连钰直接根据文件存储路径生成下载ticket,但是需要判断是否存在:https://a.cmdp.cn/basiceg/v1/csp/exist/{path} 存储路径:announce02/{textId}._val_01.json , announce02/{textId}._val_01.docx textId获取: 请求地址(post):http://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping 参数 : { "tags": [ "报告期_2024QR1", //替换为真实报告期 "GE8002005E_100000000002334460" //GE8002005E_{eid} ] } 返回示例: { "total": 1, "records": [ { "eid": "100000000037315576", "tags": { "公司名称_上海凌云实业发展股份有限公司": true, "GE8002005E_100000000002334460": true, "证券代码_900957": true, "报告期_2024QR1": true, "TEXTID_1219815286": true, //其中1219815286就是text的值 "公告标题_凌云B股:凌云B股2024年第一季度报告": true } } ] } """ def get_text_id(report_date, entity_id): url = "https://a.cmdp.cn/cnd/v1/entity/search/cmdb_bfmapping" body = { "tags": [ f"报告期_{report_date}", f"GE8002005E_{entity_id}" ] } headers = { "Content-Type": "application/json" } response = requests.post(url, json=body, headers=headers) result = response.json() tags_ = result.get("records", [{}]) if tags_: tags = tags_[0].get("tags", [{}]) else: tags = [] text_id = None for tag in tags: l = tag.split("_") if l[0] == "TEXTID": text_id = l[1] return text_id def get_report_datetime(): report_datetime = time.localtime(time.time()) year = report_datetime.tm_year month = report_datetime.tm_mon if month >= 9: return [f"{year}SAR", f"{year - 1}AR"] elif month >= 4: return [f"{year - 1}SAR", f"{year - 1}AR"] else: return [f"{year - 1}SAR", f"{year - 2}AR"] def get_report(tid): if not tid: return None docx_path = f"announce02/{tid}_val_01.docx" json_path = f"announce02/{tid}_val_01.json" if file_exists(json_path) and file_exists(docx_path): return flag, get_report_ticket("s3://" + docx_path, "回归测试报告.docx") else: print("file not exist") return None, None flags = get_report_datetime() result = "" for flag in flags: text_id = get_text_id(flag, eid) f, r = get_report(text_id) if r: result += f'

{f}:

{r}' if result != "": return result else: return "暂无" @staticmethod def get_current_product_id(): now = time.localtime(time.time()) year = now.tm_year month = now.tm_mon if month >= 5: year = year - 1 else: year = year - 2 url = "https://a.cmdp.cn/cmnd/ndisksvr/ms/v1/dir_listall/usys/products/release" body = { "opUserId": "genesys_app", "sourceUserId": "genesys_app", "filter": "'fileInfo.extends.resource.delStatus' eq 1 and 'fileInfo.extends.resource.label' eq '小智智填-定期报告专版'", "tags": ["tech_sbom", f"prod_year_{year}", "type_入口", "prod_小智转写"] } response = requests.post(url, json=body) if response.status_code == 200: response_json = response.json() code = response_json.get("code", -1) if code == 200: data = response_json.get("data", {}) records = data.get("records", []) if records: widget_info = records[0].get("fileInfo", {}).get("extends", {}).get("widget", {}) product_id = widget_info.get("id") return product_id return None @staticmethod def get_smart_genie_product(entity_id: str, product_id: str): """ 说明 选样本后的订购【打印或复制】小智类产品 内部接口,eid和个人uid是输入的。 产品选择是输入产品id: 产品 产品id 说明 小智复核-定期报告专版 已有产品,示例 小智智填-定期报告专版 1854436156570607618 产品id待补充 接口地址 https://cloud.cmdp.cn/api/product/deliver/print 输入参数[post] { "eid": "eid,不填就是查询用户所属公司的", "pid": "必填,入口产品id", "priceType":"产品版本,枚举:体验、标准版、旗舰版、特别优惠版";售前产品固定:标准版 } 输入示例 { "eid": "100000000002334059", "pid": "1858326969193848833", "priceType":"标准版" } 返回参数 {"Code":"状态码","data":"入口产品id"} 返回示例 {"Code":200,"data":"1853245209740582914"} 打印成功后会有公众号通知 """ url = "https://cloud.cmdp.cn/api/product/deliver/print" body = { "eid": entity_id, "pid": product_id if product_id else "", "priceType": "体验版" } response = requests.post(url, json=body) if response.status_code == 200: response_json = response.json() code = response_json.get("code", -1) if code == 200: return response_json.get("data", "") return None @staticmethod def get_product_qrcode(entity_id: str, product_id: str): """ 接口示例: 标签查fid: curl --location --request POST "http://10.0.0.39/octopus/usr/genesys_app/file/query" ^ --header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^ --header "Content-Type: application/json" ^ --header "Accept: */*" ^ --header "Host: 10.0.0.39" ^ --header "Connection: keep-alive" ^ --data-raw "{ \"Category\": \"default\", \"ownerID\":\"genesys_app\", \"query\": \"\\\"uid_1652264045856165888\\\"\", \"path\":\"/system/users\", \"includeHistory\":false}" fid查文件: curl --location --request GET "http://10.0.0.39/octopus/usr/genesys_app/file/1-23823/meta" ^ --header "User-Agent: Apifox/1.0.0 (https://apifox.com)" ^ --header "Accept: */*" ^ --header "Host: 10.0.0.39" ^ --header "Connection: keep-alive" query查询标签: 查询标签 【"eid_{eid}","pid_{pid}","priceType_标准版"】 """ def get_file_by_fid(file_id): url = f"http://10.0.0.39/octopus/usr/genesys_app/file/{file_id}/meta" try: response = requests.get(url) if response.status_code == 200: return response.json() else: return {} except Exception: return {} def get_file_id(entity_id: str, product_id: str): url = "http://10.0.0.39/octopus/usr/genesys_app/file/query" body = { "Category": "default", "ownerID": "genesys_app", "query": f"\"eid_{entity_id}\" & \"pid_{product_id}\" & \"priceType_标准版\"", "path": "/system/users", "includeHistory": False } response = requests.post(url, json=body) if response.status_code == 200: return response.json() else: return [] if not product_id: return None file_ids = get_file_id(entity_id, product_id) if not file_ids: return None file_info = get_file_by_fid(file_ids[0]) return file_info.get("uri") @staticmethod def send_company_mail(to_address: str, subject: str, content: str): url = "https://a.cmdp.cn/apm/v1/ability/api000357" body = {"_data": { "type": "html", "tos": [to_address], "subject": subject, "text": content } } response = requests.post(url, json=body) return response.json() @staticmethod def get_qrcode_base64(product_qrcode_path): if not product_qrcode_path: return "" response = requests.get(product_qrcode_path) if response.status_code == 200: return base64.b64encode(response.content).decode('utf-8') else: return "" @staticmethod def get_information_disclosure_picture(year: str, security_code: str): basiceg_url = "http://a.cmdp.cn/basiceg/v1/bytes" file_path = f"/usercentre03-dev/user/cache/{year}/信披成绩单/{security_code}_idsr.jpg" static_path = f"/product-data/static/xpcjd/{security_code}_idsr.jpg" if file_exists(file_path): resp = requests.get(basiceg_url + file_path) if resp.status_code == 200: data = resp.content requests.put(basiceg_url + static_path, data=data) return f'' else: return "" st = SalesTools() class CalendarEventDB(CalendarDB): def _transfer_calendar_data(self, data: dict) -> dict: pass def trigger(self, data: dict): security_code = data.get("company_code") user_list = st.get_user_info(security_code) # 获取公司信息 company_info = st.get_org_info(security_code) eid = company_info.get("eid") full_name = company_info.get("fullName") # 打标签 if user_list: for user in user_list: user_id = user.get("id") st.update_user_tag(user_id) # 获取bp id bp_info = self.cloud_connection.get_user_info_by_name(self.name) bp_id = bp_info.get("id") bp_mail = bp_info.get("email") # 获取报告文件 now = time.localtime(time.time()) year = now.tm_year month = now.tm_mon if month >= 5: year = year - 1 else: year = year - 2 report_tags = [f"报告年度_{year}", "biz_项目主体类别_单体", "biz_业务场所_深交所"] report_rule_info = st.get_report_rule(report_tags) report_rule_path = report_rule_info.get("data", {}).get("records", "") if report_rule_path: report_rule_path = report_rule_path[0].get("filePath", "") report_info = st.get_report_info(bp_id, security_code, report_rule_path) report_ticket = report_info.get("data").get("path") regression_testing = st.get_regression_testing(eid) with open("template.html", "r", encoding="utf-8") as f: content = f.read() content = content.replace("{{report_ticket}}", report_ticket) content = content.replace("{{regression_testing}}", regression_testing) content = content.replace("{{picture}}", st.get_information_disclosure_picture("2024", security_code)) st.send_company_mail(bp_mail, f"客户拜访准备文件-{full_name}[{security_code}]", content) ssp.add_row(doc_id=f_id, sheet_id="tTMuVB", records=[ { "values": { "证券代码": [ { "type": "text", "text": data["company_code"] } ], "证券简称": [{ "type": "text", "text": data["company_name"] }] , "地点": [{ "type": "text", "text": data["company_address"] }] , "拜访时间": data["visit_time"] + "000" , "结束时间": data["end_time"] + "000", "拜访人": [{ "type": "text", "text": data["_uid"] }] } }]) ceb = CalendarEventDB() def get_approval(template_id): approval_list = wapi.get_approval_list_weekly(template_id) info = {} for approval_id in approval_list: approval_info = wapi.get_approval_info(approval_id) approval_info = parse_data(approval_info["info"], {"Number-1697103952657": "company_code", "Text-1697103919621": "company_name", "Text-1728374569134": "company_address", "Date-1728370374837": "visit_time", "Date-1728370485478": "end_time"}) _uid = approval_info["_uid"] if _uid not in info: _u = wup.get_user_info(_uid) info[_uid] = {"name": _u["name"]} else: _p = info[_uid] company_code = approval_info["company_code"] if company_code not in _p: _p[company_code] = 1 else: _p[company_code] += 1 return info def generate_approval_report_html(times_report): htms = " " htme = "
姓名公司代码次数
" content = "" for key in times_report: key_report = times_report[key] name = key_report["name"] key_report_list = [(i, key_report[i]) for i in key_report if i != "name"] size = len(key_report_list) for idx, key_info in enumerate(key_report_list): if idx == 0: content += f"{name}{key_info[0]}{key_info[1]}" else: content += f"{key_info[0]}{key_info[1]}" return htms + content + htme def new_product_print(info): url = "https://cloud.cmdp.cn/api/product/print/customize" body = {"userId": info["user_id"], "secCode": info["company_code"], "prodName": info["service_name"], "opUserId": info["_uid"]} r = requests.post(url, json=body) return r.json()