Files
Esmart/esmart.py
2025-09-28 15:48:47 +08:00

530 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import pytz
import requests
import time
def convert_to_utc(time_str, time_format='%Y-%m-%d %H:%M:%S', source_timezone=None):
# 解析时间字符串为datetime对象
local_dt = datetime.datetime.strptime(time_str, time_format)
# 如果指定了源时区则将本地时间转换为UTC时间
if source_timezone:
tz = pytz.timezone(source_timezone)
local_dt = tz.localize(local_dt)
utc_dt = local_dt.astimezone(pytz.UTC)
else:
# 如果没有指定源时区则假设时间字符串已经是UTC时间
utc_dt = pytz.UTC.localize(local_dt)
return utc_dt
def push_message(title: str, message: str, url: str):
url = "https://api.chuckfang.com/Eirf"
resp = requests.post(url, json={"title": title, "msg": message, "url": url})
def transform_data(data, mapping):
temp = {}
for k, v in mapping.items():
if k in data:
if k == "date":
utc_time = convert_to_utc(data[k], source_timezone='Asia/Shanghai').timestamp()
timestamp = str(int(utc_time))
temp[v] = timestamp + "000"
else:
temp[v] = data[k]
return temp
class SmartSheet:
def __init__(self, doc_id, sheet_id):
self.doc_id = doc_id
self.sheet_id = sheet_id
self._columns = None
self._data = None
@staticmethod
def _post(url, body):
resp = requests.post(url, json=body)
if resp.status_code == 200:
return resp.json()
return {}
def _get_columns(self):
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
body = {
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_fields",
"method": "POST",
"body": {
"docid": self.doc_id,
"sheet_id": self.sheet_id
}
}
response = self._post(url, body)
if response:
fields = response["fields"]
return fields
return []
def _get_data(self, filters: dict, cursor: int = 0, limit: int = 1000):
url = "https://a.cmdp.cn/umss/v1/wxw/smartSheet/getRecords"
body = {
"docid": self.doc_id,
"sheet_id": self.sheet_id,
"filter_spec": filters,
"offset": cursor,
"limit": limit
}
response = self._post(url, body)
if response:
cursor = response["next"]
records = response["records"]
return cursor, records
return 0, []
def _add(self, records: list):
url = "https://a.cmdp.cn/umss/v1/wxw/smartSheet/addRecords"
body = {
"docid": self.doc_id,
"sheet_id": self.sheet_id,
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": records
}
response = self._post(url, body)
if response:
return True
return False
def add_fields(self, fields: list):
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
body = {
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_fields",
"method": "POST",
"body": {
"docid": self.doc_id,
"sheet_id": self.sheet_id,
"fields": fields
}
}
return self._post(url, body)
def delete_field(self, field_ids: list):
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
body = {
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/delete_fields",
"method": "POST",
"body": {
"docid": self.doc_id,
"sheet_id": self.sheet_id,
"field_ids": field_ids
}
}
return self._post(url, body)
def transform_data(self, data: dict):
result = {}
columns = self.columns
for i in data:
type_info = columns[i]
i_type = type_info.get("field_type")
if i_type == "FIELD_TYPE_TEXT":
result[i] = [
{
"text": data[i],
"type": "text"
}
]
elif i_type == "FIELD_TYPE_SINGLE_SELECT":
options = {i["text"]: i for i in type_info['property_single_select']["options"]}
result[i] = [
options[data[i]]
]
else:
result[i] = data[i]
return result
def convert_data(self, data: dict):
va = {}
for v in data:
type_info = self.columns[v]
i_type = type_info.get("field_type")
if i_type == "FIELD_TYPE_TEXT":
va[v] = "".join([m["text"] for m in data[v]])
elif i_type == "FIELD_TYPE_DATE_TIME":
va[v] = 0 if data[v] is None else data[v]
elif i_type == "FIELD_TYPE_NUMBER":
va[v] = 0 if data[v] is None else data[v]
elif i_type == "FIELD_TYPE_SELECT":
va[v] = [m["text"] for m in data[v]]
elif i_type == "FIELD_TYPE_SINGLE_SELECT":
va[v] = data[v][0]["text"] if data[v] else ""
elif i_type == "FIELD_TYPE_TWOWAYLINKRECORDS":
va[v] = data[v]
elif i_type == "FIELD_TYPE_AUTONUMBER":
va[v] = data[v]["text"] if data[v] else ""
elif i_type == "FIELD_TYPE_CHECKBOX":
va[v] = data[v]
elif i_type == "FIELD_TYPE_REFERENCE":
va[v] = data[v]
else:
print(i_type, data[v], type_info)
return va
@property
def columns(self):
if self._columns is None:
self._columns = self._get_columns()
return {i["field_title"]: i for i in self._columns}
@property
def data(self):
if self._data is None:
self._data = self.search({})
return self._data
def search(self, filters: dict):
cursor = 0
result = []
while True:
cursor, records = self._get_data(filters, cursor=cursor)
result += records
if cursor == 0:
break
return result
def add(self, data: dict):
return self._add([{"values": self.transform_data(data)}])
def batch_add(self, data_list: list):
return self._add([{"values": self.transform_data(i)} for i in data_list])
def to_json(self):
data = []
for i in self.data:
values = i["values"]
va = self.convert_data(values)
va['_rid'] = i["record_id"]
data.append(va)
return {"columns": self.columns, "data": data}
class SmartMetadata(SmartSheet):
def __init__(self, doc_id, sheet_id=None):
super().__init__(doc_id, sheet_id)
def create(self):
url = "https://a.cmdp.cn/umss/v1/wxw/addSheet"
body = {
"docid": self.doc_id,
"properties": {
"title": "Metadata"
}
}
resp = self._post(url, body)
if resp.get("properties"):
self.sheet_id = resp["properties"]["sheet_id"]
self.add_fields([
{
"field_type": "FIELD_TYPE_TEXT",
"field_title": "名称"
},
{
"property_single_select": {
"options": [{"style": 13, "text": "doc"},
{"style": 7, "text": "sheet"},
{"style": 11, "text": "metadata"}
],
"is_multiple": False,
"is_quick_add": True
},
"field_type": "FIELD_TYPE_SINGLE_SELECT",
"field_title": "类型"
}, {
"field_type": "FIELD_TYPE_TEXT",
"field_title": ""
}
])
self.delete_field([self.columns[i]["field_id"] for i in self.columns if i not in ["名称", "类型", ""]])
self.update(self.sheet_id, sheet_name="Metadata", sheet_type="metadata")
def update(self, sheet_id, sheet_name, sheet_type="sheet"):
self.add({"名称": sheet_name, "类型": sheet_type, "": sheet_id})
class SmartTableApi:
def __init__(self, doc_id=None):
self.doc_id = doc_id
self._metadata = None
self.metadata_table = None
if self.doc_id is not None:
self._get_sheet_list()
@staticmethod
def _post(url, body):
resp = requests.post(url, json=body)
if resp.status_code == 200:
return resp.json()
return {}
def create(self, doc_name: str, admin_users: list):
url = "https://a.cmdp.cn/umss/v1/wxw/addDocument"
body = {
"doc_type": 10,
"doc_name": doc_name,
"admin_users": admin_users
}
resp = self._post(url, body)
self.doc_id = resp.get("docid")
self._get_sheet_list()
self.metadata_table.update(self.doc_id, sheet_name=doc_name, sheet_type="doc")
def _get_sheet_list(self):
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
body = {
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet",
"method": "POST",
"body": {
"docid": self.doc_id
}
}
resp = self._post(url, body)
sheet_list = resp.get("sheet_list", [])
result = {i["title"]: i["sheet_id"] for i in sheet_list}
if "Metadata" in result:
metadata_id = result.get("Metadata")
self.metadata_table = SmartMetadata(self.doc_id, metadata_id)
del result["Metadata"]
else:
self.metadata_table = SmartMetadata(self.doc_id)
self.metadata_table.create()
self._metadata = result
@property
def metadata(self):
return self._metadata
def _add_sheet(self, sheet_name):
url = "https://a.cmdp.cn/umss/v1/wxw/addSheet"
body = {
"docid": self.doc_id,
"properties": {
"title": sheet_name
}
}
return self._post(url, body)
def add_sheet(self, sheet_name):
resp = self._add_sheet(sheet_name)
if resp.get("properties"):
self.metadata_table.update(sheet_id=resp["properties"]["sheet_id"], sheet_name=sheet_name)
return resp
def get_sheet(self, sheet_id):
return SmartSheet(self.doc_id, sheet_id)
def get_sheet_by_name(self, sheet_name):
if sheet_name in self.metadata:
return self.get_sheet(self.metadata[sheet_name])
else:
return None
class ESmart:
def __init__(self, time_out: int = 12 * 60 * 60):
self.time_out = time_out
self.timestamp = int(time.time())
self.report_sheet = SmartSheet(
"dc48YTUb9gNYCpIWg5v90-U5HIWT72bm3Fkw9Jvi33_sP3I0H32QEDFNOf69TS1bbUjdsYLmvArqqUootD34nm8Q", "L1wFol")
self.user_sheet = SmartSheet(
"dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw",
"8vVxEx")
self.notice_sheet = SmartSheet(
"dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw", "2943X5")
self.user_log_sheet = SmartSheet(
"dcqzSDHPl2ZWoLkEB3Id1wMMaW0meMSwGLjfnuHD0VMh0DVidKdal-3wYfsh7Zb1pGn9moDuzjvhtlRZAXoO-Hjg", "g6dTI2")
self.base_sheet = SmartSheet(
"dcqzSDHPl2ZWoLkEB3Id1wMMaW0meMSwGLjfnuHD0VMh0DVidKdal-3wYfsh7Zb1pGn9moDuzjvhtlRZAXoO-Hjg", "1gTulR")
self._base_data = None
self._user_info = None
self._security_mapping = None
@staticmethod
def _convert_user_info(data):
_data = {}
for i in data["data"]:
if "来源对象ID#eid" in i:
v = {
"eid": i["来源对象ID#eid"],
"name": i["名称#name"],
"value": i["数值#value"],
"type": i["类型"],
"_rid": i["_rid"],
}
k = v["name"].split("_")
k = k[0] if len(k) > 0 else ""
_data[k] = v
return _data
@property
def now(self):
now = datetime.datetime.now()
# 创建一个表示北京时区的对象
beijing = pytz.timezone('Asia/Shanghai')
# 将当前时间转换为北京时区
return now.astimezone(beijing)
@property
def today(self):
return self.now.strftime("%Y-%m-%d")
@property
def yesterday(self):
return (self.now - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
@property
def base_data(self):
if self._base_data is None:
self._base_data = self.base_sheet.to_json()
if self._check_timeout():
self._base_data = self.base_sheet.to_json()
return self._base_data
@property
def user_info(self):
if self._user_info is None:
self._user_info = self.user_sheet.to_json()
if self._check_timeout():
self._user_info = self.user_sheet.to_json()
return self._user_info
@property
def security_mapping(self):
_data = {}
if self._security_mapping and self._check_timeout():
return self._security_mapping
else:
u_mapping = self._convert_user_info(self.user_info)
for i in self.base_data["data"]:
k = i["深交所上市公司代码"]
fpic = i.get("跟进负责人", []) # Follow-up person in charge
v = []
if fpic:
for p in fpic:
pid = u_mapping.get(p, {}).get('_rid')
if pid:
v.append(pid)
_data[k] = v
self._security_mapping = _data
return self._security_mapping
def _check_timeout(self):
now = int(time.time())
if now - self.timestamp > self.time_out:
print("refresh")
self.timestamp = now
return True
else:
return False
def refresh(self):
self._base_data = self.base_sheet.to_json()
self._user_info = self.user_sheet.to_json()
def generate_message(self, program: str, title: str, subject: str, content: str, tos: list, msg_type: str = "中性",
top: bool = False):
data = {
"*发布日期": f"{self.today}",
"*栏目#gName": program,
"状态": "待发布",
"*摘要#subject": subject,
"置顶#isTop": top,
"*详情#content": content,
"*标题#title": title,
"消息类型#msgType": msg_type,
"*消息接收对象#sendList": tos
}
return data
def using_info_message(self, data):
code = data.get('公司代码')
if code:
name = data.get('公司名称', "")
product_name = data.get('产品', "")
using_count = data.get('使用动态口令次数', 0)
user_name = data.get('用户名称', "")
using_time = data.get('时间', "")
using_time = datetime.datetime.fromtimestamp(int(using_time[:-3])) if using_time else using_time
tos = self.security_mapping.get(code, [])
return self.generate_message("客户动态",
f"跟进客户使用产品情况({self.yesterday}",
"点击可查看您所跟进的上市公司客户的产品动态口令获取记录。",
f"|证券代码 |公司名称 |产品 |用户|使用时间|动态口令次数 |\n|:--|\n|{code} |{name} |{product_name} |{user_name}|{using_time}| {using_count} |\n",
tos)
return {}
def report_info_message(self, data):
code = data.get('代码')
if code:
name = data.get('简称', "")
title = data.get('公告标题', "")
report_type = data.get('公告类型', "")
report_time = data.get('公告时间', "")
report_uri = data.get('公告链接', "")
tos = self.security_mapping.get(code, [])
return self.generate_message("客户动态",
f"跟进客户的公告披露信息({self.yesterday}",
"点击可查看您所跟进的上市公司客户的公告披露信息。",
f'|证券代码 |证券简称 |公告标题 |公告类型 |公告时间 |公告链接 |\n|:--|\n|{code} |{name} |{title} |{report_type} |{report_time}|<a data-href="{report_uri}">{report_uri}</a> | \n',
tos)
return {}
def prepared_message(self):
count = 0
messages = []
for i in self.report_sheet.search({
"conjunction": "CONJUNCTION_AND",
"conditions": [{
"field_id": "fjVQAh",
"field_type": "FIELD_TYPE_CREATED_TIME",
"operator": "OPERATOR_IS",
"date_time_value": {
"type": "DATE_TIME_TYPE_YESTERDAY"
}
}]
}):
data = self.report_sheet.convert_data(i["values"])
message = self.report_info_message(data)
if message:
count += 1
messages.append(message)
for i in self.user_log_sheet.search({
"conjunction": "CONJUNCTION_AND",
"conditions": [{
"field_id": "fvJi8f",
"field_type": "FIELD_TYPE_CREATED_TIME",
"operator": "OPERATOR_IS",
"date_time_value": {
"type": "DATE_TIME_TYPE_YESTERDAY"
}
}]
}):
data = self.user_log_sheet.convert_data(i["values"])
message = self.using_info_message(data)
if message:
count += 1
messages.append(message)
self.notice_sheet.batch_add(messages)
push_message("推送消息通知", f"今天是{self.today} 本次预计计推送{count}条消息", "")
@staticmethod
def push():
resp = requests.get("https://cloud.cmdp.cn/wapi/recommend_message_push")
if resp.status_code == 200:
return resp.json()
return {}