import datetime import pytz import requests import time def convert_to_utc(time_str, time_format='%Y-%m-%d %H:%M:%S', source_timezone=None): # 解析时间字符串为datetime对象 local_dt = datetime.datetime.strptime(time_str, time_format) # 如果指定了源时区,则将本地时间转换为UTC时间 if source_timezone: tz = pytz.timezone(source_timezone) local_dt = tz.localize(local_dt) utc_dt = local_dt.astimezone(pytz.UTC) else: # 如果没有指定源时区,则假设时间字符串已经是UTC时间 utc_dt = pytz.UTC.localize(local_dt) return utc_dt def push_message(title: str, message: str, url: str): url = "https://api.chuckfang.com/Eirf" resp = requests.post(url, json={"title": title, "msg": message, "url": url}) def transform_data(data, mapping): temp = {} for k, v in mapping.items(): if k in data: if k == "date": utc_time = convert_to_utc(data[k], source_timezone='Asia/Shanghai').timestamp() timestamp = str(int(utc_time)) temp[v] = timestamp + "000" else: temp[v] = data[k] return temp class SmartSheet: def __init__(self, doc_id, sheet_id): self.doc_id = doc_id self.sheet_id = sheet_id self._columns = None self._data = None @staticmethod def _post(url, body): resp = requests.post(url, json=body) if resp.status_code == 200: return resp.json() return {} def _get_columns(self): url = "https://a.cmdp.cn/umss/v1/wxw/common/submit" body = { "url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_fields", "method": "POST", "body": { "docid": self.doc_id, "sheet_id": self.sheet_id } } response = self._post(url, body) if response: fields = response["fields"] return fields return [] def _get_data(self, filters: dict, cursor: int = 0, limit: int = 1000): url = "https://a.cmdp.cn/umss/v1/wxw/smartSheet/getRecords" body = { "docid": self.doc_id, "sheet_id": self.sheet_id, "filter_spec": filters, "offset": cursor, "limit": limit } response = self._post(url, body) if response: cursor = response["next"] records = response["records"] return cursor, records return 0, [] def _add(self, records: list): url = "https://a.cmdp.cn/umss/v1/wxw/smartSheet/addRecords" body = { "docid": self.doc_id, "sheet_id": self.sheet_id, "key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE", "records": records } response = self._post(url, body) if response: return True return False def add_fields(self, fields: list): url = "https://a.cmdp.cn/umss/v1/wxw/common/submit" body = { "url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_fields", "method": "POST", "body": { "docid": self.doc_id, "sheet_id": self.sheet_id, "fields": fields } } return self._post(url, body) def delete_field(self, field_ids: list): url = "https://a.cmdp.cn/umss/v1/wxw/common/submit" body = { "url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/delete_fields", "method": "POST", "body": { "docid": self.doc_id, "sheet_id": self.sheet_id, "field_ids": field_ids } } return self._post(url, body) def transform_data(self, data: dict): result = {} columns = self.columns for i in data: type_info = columns[i] i_type = type_info.get("field_type") if i_type == "FIELD_TYPE_TEXT": result[i] = [ { "text": data[i], "type": "text" } ] elif i_type == "FIELD_TYPE_SINGLE_SELECT": options = {i["text"]: i for i in type_info['property_single_select']["options"]} result[i] = [ options[data[i]] ] else: result[i] = data[i] return result def convert_data(self, data: dict): va = {} for v in data: type_info = self.columns[v] i_type = type_info.get("field_type") if i_type == "FIELD_TYPE_TEXT": va[v] = "".join([m["text"] for m in data[v]]) elif i_type == "FIELD_TYPE_DATE_TIME": va[v] = 0 if data[v] is None else data[v] elif i_type == "FIELD_TYPE_NUMBER": va[v] = 0 if data[v] is None else data[v] elif i_type == "FIELD_TYPE_SELECT": va[v] = [m["text"] for m in data[v]] elif i_type == "FIELD_TYPE_SINGLE_SELECT": va[v] = data[v][0]["text"] if data[v] else "" elif i_type == "FIELD_TYPE_TWOWAYLINKRECORDS": va[v] = data[v] elif i_type == "FIELD_TYPE_AUTONUMBER": va[v] = data[v]["text"] if data[v] else "" elif i_type == "FIELD_TYPE_CHECKBOX": va[v] = data[v] elif i_type == "FIELD_TYPE_REFERENCE": va[v] = data[v] else: print(i_type, data[v], type_info) return va @property def columns(self): if self._columns is None: self._columns = self._get_columns() return {i["field_title"]: i for i in self._columns} @property def data(self): if self._data is None: self._data = self.search({}) return self._data def search(self, filters: dict): cursor = 0 result = [] while True: cursor, records = self._get_data(filters, cursor=cursor) result += records if cursor == 0: break return result def add(self, data: dict): return self._add([{"values": self.transform_data(data)}]) def batch_add(self, data_list: list): return self._add([{"values": self.transform_data(i)} for i in data_list]) def to_json(self): data = [] for i in self.data: values = i["values"] va = self.convert_data(values) va['_rid'] = i["record_id"] data.append(va) return {"columns": self.columns, "data": data} class SmartMetadata(SmartSheet): def __init__(self, doc_id, sheet_id=None): super().__init__(doc_id, sheet_id) def create(self): url = "https://a.cmdp.cn/umss/v1/wxw/addSheet" body = { "docid": self.doc_id, "properties": { "title": "Metadata" } } resp = self._post(url, body) if resp.get("properties"): self.sheet_id = resp["properties"]["sheet_id"] self.add_fields([ { "field_type": "FIELD_TYPE_TEXT", "field_title": "名称" }, { "property_single_select": { "options": [{"style": 13, "text": "doc"}, {"style": 7, "text": "sheet"}, {"style": 11, "text": "metadata"} ], "is_multiple": False, "is_quick_add": True }, "field_type": "FIELD_TYPE_SINGLE_SELECT", "field_title": "类型" }, { "field_type": "FIELD_TYPE_TEXT", "field_title": "值" } ]) self.delete_field([self.columns[i]["field_id"] for i in self.columns if i not in ["名称", "类型", "值"]]) self.update(self.sheet_id, sheet_name="Metadata", sheet_type="metadata") def update(self, sheet_id, sheet_name, sheet_type="sheet"): self.add({"名称": sheet_name, "类型": sheet_type, "值": sheet_id}) class SmartTableApi: def __init__(self, doc_id=None): self.doc_id = doc_id self._metadata = None self.metadata_table = None if self.doc_id is not None: self._get_sheet_list() @staticmethod def _post(url, body): resp = requests.post(url, json=body) if resp.status_code == 200: return resp.json() return {} def create(self, doc_name: str, admin_users: list): url = "https://a.cmdp.cn/umss/v1/wxw/addDocument" body = { "doc_type": 10, "doc_name": doc_name, "admin_users": admin_users } resp = self._post(url, body) self.doc_id = resp.get("docid") self._get_sheet_list() self.metadata_table.update(self.doc_id, sheet_name=doc_name, sheet_type="doc") def _get_sheet_list(self): url = "https://a.cmdp.cn/umss/v1/wxw/common/submit" body = { "url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet", "method": "POST", "body": { "docid": self.doc_id } } resp = self._post(url, body) sheet_list = resp.get("sheet_list", []) result = {i["title"]: i["sheet_id"] for i in sheet_list} if "Metadata" in result: metadata_id = result.get("Metadata") self.metadata_table = SmartMetadata(self.doc_id, metadata_id) del result["Metadata"] else: self.metadata_table = SmartMetadata(self.doc_id) self.metadata_table.create() self._metadata = result @property def metadata(self): return self._metadata def _add_sheet(self, sheet_name): url = "https://a.cmdp.cn/umss/v1/wxw/addSheet" body = { "docid": self.doc_id, "properties": { "title": sheet_name } } return self._post(url, body) def add_sheet(self, sheet_name): resp = self._add_sheet(sheet_name) if resp.get("properties"): self.metadata_table.update(sheet_id=resp["properties"]["sheet_id"], sheet_name=sheet_name) return resp def get_sheet(self, sheet_id): return SmartSheet(self.doc_id, sheet_id) def get_sheet_by_name(self, sheet_name): if sheet_name in self.metadata: return self.get_sheet(self.metadata[sheet_name]) else: return None class ESmart: def __init__(self, time_out: int = 12 * 60 * 60): self.time_out = time_out self.timestamp = int(time.time()) self.report_sheet = SmartSheet( "dc48YTUb9gNYCpIWg5v90-U5HIWT72bm3Fkw9Jvi33_sP3I0H32QEDFNOf69TS1bbUjdsYLmvArqqUootD34nm8Q", "L1wFol") self.user_sheet = SmartSheet( "dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw", "8vVxEx") self.notice_sheet = SmartSheet( "dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw", "2943X5") self.user_log_sheet = SmartSheet( "dcqzSDHPl2ZWoLkEB3Id1wMMaW0meMSwGLjfnuHD0VMh0DVidKdal-3wYfsh7Zb1pGn9moDuzjvhtlRZAXoO-Hjg", "g6dTI2") self.base_sheet = SmartSheet( "dcqzSDHPl2ZWoLkEB3Id1wMMaW0meMSwGLjfnuHD0VMh0DVidKdal-3wYfsh7Zb1pGn9moDuzjvhtlRZAXoO-Hjg", "1gTulR") self._base_data = None self._user_info = None self._security_mapping = None @staticmethod def _convert_user_info(data): _data = {} for i in data["data"]: if "来源对象ID#eid" in i: v = { "eid": i["来源对象ID#eid"], "name": i["名称#name"], "value": i["数值#value"], "type": i["类型"], "_rid": i["_rid"], } k = v["name"].split("_") k = k[0] if len(k) > 0 else "" _data[k] = v return _data @property def now(self): now = datetime.datetime.now() # 创建一个表示北京时区的对象 beijing = pytz.timezone('Asia/Shanghai') # 将当前时间转换为北京时区 return now.astimezone(beijing) @property def today(self): return self.now.strftime("%Y-%m-%d") @property def yesterday(self): return (self.now - datetime.timedelta(days=1)).strftime("%Y-%m-%d") @property def base_data(self): if self._base_data is None: self._base_data = self.base_sheet.to_json() if self._check_timeout(): self._base_data = self.base_sheet.to_json() return self._base_data @property def user_info(self): if self._user_info is None: self._user_info = self.user_sheet.to_json() if self._check_timeout(): self._user_info = self.user_sheet.to_json() return self._user_info @property def security_mapping(self): _data = {} if self._security_mapping and self._check_timeout(): return self._security_mapping else: u_mapping = self._convert_user_info(self.user_info) for i in self.base_data["data"]: k = i["深交所上市公司代码"] fpic = i.get("跟进负责人", []) # Follow-up person in charge v = [] if fpic: for p in fpic: pid = u_mapping.get(p, {}).get('_rid') if pid: v.append(pid) _data[k] = v self._security_mapping = _data return self._security_mapping def _check_timeout(self): now = int(time.time()) if now - self.timestamp > self.time_out: print("refresh") self.timestamp = now return True else: return False def refresh(self): self._base_data = self.base_sheet.to_json() self._user_info = self.user_sheet.to_json() def generate_message(self, program: str, title: str, subject: str, content: str, tos: list, msg_type: str = "中性", top: bool = False): data = { "*发布日期": f"{self.today}", "*栏目#gName": program, "状态": "待发布", "*摘要#subject": subject, "置顶#isTop": top, "*详情#content": content, "*标题#title": title, "消息类型#msgType": msg_type, "*消息接收对象#sendList": tos } return data def using_info_message(self, data): code = data.get('公司代码') if code: name = data.get('公司名称', "") product_name = data.get('产品', "") using_count = data.get('使用动态口令次数', 0) user_name = data.get('用户名称', "") using_time = data.get('时间', "") using_time = datetime.datetime.fromtimestamp(int(using_time[:-3])) if using_time else using_time tos = self.security_mapping.get(code, []) return self.generate_message("客户动态", f"跟进客户使用产品情况({self.yesterday})", "点击可查看您所跟进的上市公司客户的产品动态口令获取记录。", f"|证券代码 |公司名称 |产品 |用户|使用时间|动态口令次数 |\n|:--|\n|{code} |{name} |{product_name} |{user_name}|{using_time}| {using_count} |\n", tos) return {} def report_info_message(self, data): code = data.get('代码') if code: name = data.get('简称', "") title = data.get('公告标题', "") report_type = data.get('公告类型', "") report_time = data.get('公告时间', "") report_uri = data.get('公告链接', "") tos = self.security_mapping.get(code, []) return self.generate_message("客户动态", f"跟进客户的公告披露信息({self.yesterday})", "点击可查看您所跟进的上市公司客户的公告披露信息。", f'|证券代码 |证券简称 |公告标题 |公告类型 |公告时间 |公告链接 |\n|:--|\n|{code} |{name} |{title} |{report_type} |{report_time}|{report_uri} | \n', tos) return {} def prepared_message(self): count = 0 messages = [] for i in self.report_sheet.search({ "conjunction": "CONJUNCTION_AND", "conditions": [{ "field_id": "fjVQAh", "field_type": "FIELD_TYPE_CREATED_TIME", "operator": "OPERATOR_IS", "date_time_value": { "type": "DATE_TIME_TYPE_YESTERDAY" } }] }): data = self.report_sheet.convert_data(i["values"]) message = self.report_info_message(data) if message: count += 1 messages.append(message) for i in self.user_log_sheet.search({ "conjunction": "CONJUNCTION_AND", "conditions": [{ "field_id": "fvJi8f", "field_type": "FIELD_TYPE_CREATED_TIME", "operator": "OPERATOR_IS", "date_time_value": { "type": "DATE_TIME_TYPE_YESTERDAY" } }] }): data = self.user_log_sheet.convert_data(i["values"]) message = self.using_info_message(data) if message: count += 1 messages.append(message) self.notice_sheet.batch_add(messages) push_message("推送消息通知", f"今天是{self.today} 本次预计计推送{count}条消息", "") @staticmethod def push(): resp = requests.get("https://cloud.cmdp.cn/wapi/recommend_message_push") if resp.status_code == 200: return resp.json() return {}