530 lines
18 KiB
Python
530 lines
18 KiB
Python
import datetime
|
||
import pytz
|
||
import requests
|
||
import time
|
||
|
||
|
||
def convert_to_utc(time_str, time_format='%Y-%m-%d %H:%M:%S', source_timezone=None):
|
||
# 解析时间字符串为datetime对象
|
||
local_dt = datetime.datetime.strptime(time_str, time_format)
|
||
|
||
# 如果指定了源时区,则将本地时间转换为UTC时间
|
||
if source_timezone:
|
||
tz = pytz.timezone(source_timezone)
|
||
local_dt = tz.localize(local_dt)
|
||
utc_dt = local_dt.astimezone(pytz.UTC)
|
||
else:
|
||
# 如果没有指定源时区,则假设时间字符串已经是UTC时间
|
||
utc_dt = pytz.UTC.localize(local_dt)
|
||
|
||
return utc_dt
|
||
|
||
|
||
def push_message(title: str, message: str, url: str):
|
||
url = "https://api.chuckfang.com/Eirf"
|
||
resp = requests.post(url, json={"title": title, "msg": message, "url": url})
|
||
|
||
|
||
def transform_data(data, mapping):
|
||
temp = {}
|
||
for k, v in mapping.items():
|
||
if k in data:
|
||
if k == "date":
|
||
utc_time = convert_to_utc(data[k], source_timezone='Asia/Shanghai').timestamp()
|
||
timestamp = str(int(utc_time))
|
||
temp[v] = timestamp + "000"
|
||
else:
|
||
temp[v] = data[k]
|
||
return temp
|
||
|
||
|
||
class SmartSheet:
|
||
def __init__(self, doc_id, sheet_id):
|
||
self.doc_id = doc_id
|
||
self.sheet_id = sheet_id
|
||
self._columns = None
|
||
self._data = None
|
||
|
||
@staticmethod
|
||
def _post(url, body):
|
||
resp = requests.post(url, json=body)
|
||
if resp.status_code == 200:
|
||
return resp.json()
|
||
return {}
|
||
|
||
def _get_columns(self):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
|
||
body = {
|
||
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_fields",
|
||
"method": "POST",
|
||
"body": {
|
||
"docid": self.doc_id,
|
||
"sheet_id": self.sheet_id
|
||
}
|
||
}
|
||
response = self._post(url, body)
|
||
if response:
|
||
fields = response["fields"]
|
||
return fields
|
||
return []
|
||
|
||
def _get_data(self, filters: dict, cursor: int = 0, limit: int = 1000):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/smartSheet/getRecords"
|
||
body = {
|
||
"docid": self.doc_id,
|
||
"sheet_id": self.sheet_id,
|
||
"filter_spec": filters,
|
||
"offset": cursor,
|
||
"limit": limit
|
||
}
|
||
response = self._post(url, body)
|
||
if response:
|
||
cursor = response["next"]
|
||
records = response["records"]
|
||
return cursor, records
|
||
return 0, []
|
||
|
||
def _add(self, records: list):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/smartSheet/addRecords"
|
||
body = {
|
||
"docid": self.doc_id,
|
||
"sheet_id": self.sheet_id,
|
||
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
|
||
"records": records
|
||
}
|
||
response = self._post(url, body)
|
||
if response:
|
||
return True
|
||
return False
|
||
|
||
def add_fields(self, fields: list):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
|
||
body = {
|
||
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_fields",
|
||
"method": "POST",
|
||
"body": {
|
||
"docid": self.doc_id,
|
||
"sheet_id": self.sheet_id,
|
||
"fields": fields
|
||
}
|
||
}
|
||
return self._post(url, body)
|
||
|
||
def delete_field(self, field_ids: list):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
|
||
body = {
|
||
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/delete_fields",
|
||
"method": "POST",
|
||
"body": {
|
||
"docid": self.doc_id,
|
||
"sheet_id": self.sheet_id,
|
||
"field_ids": field_ids
|
||
|
||
}
|
||
}
|
||
return self._post(url, body)
|
||
|
||
def transform_data(self, data: dict):
|
||
result = {}
|
||
columns = self.columns
|
||
for i in data:
|
||
type_info = columns[i]
|
||
i_type = type_info.get("field_type")
|
||
if i_type == "FIELD_TYPE_TEXT":
|
||
result[i] = [
|
||
{
|
||
"text": data[i],
|
||
"type": "text"
|
||
}
|
||
]
|
||
elif i_type == "FIELD_TYPE_SINGLE_SELECT":
|
||
options = {i["text"]: i for i in type_info['property_single_select']["options"]}
|
||
result[i] = [
|
||
options[data[i]]
|
||
]
|
||
else:
|
||
result[i] = data[i]
|
||
return result
|
||
|
||
def convert_data(self, data: dict):
|
||
va = {}
|
||
for v in data:
|
||
type_info = self.columns[v]
|
||
i_type = type_info.get("field_type")
|
||
if i_type == "FIELD_TYPE_TEXT":
|
||
va[v] = "".join([m["text"] for m in data[v]])
|
||
elif i_type == "FIELD_TYPE_DATE_TIME":
|
||
va[v] = 0 if data[v] is None else data[v]
|
||
elif i_type == "FIELD_TYPE_NUMBER":
|
||
va[v] = 0 if data[v] is None else data[v]
|
||
elif i_type == "FIELD_TYPE_SELECT":
|
||
va[v] = [m["text"] for m in data[v]]
|
||
elif i_type == "FIELD_TYPE_SINGLE_SELECT":
|
||
va[v] = data[v][0]["text"] if data[v] else ""
|
||
elif i_type == "FIELD_TYPE_TWOWAYLINKRECORDS":
|
||
va[v] = data[v]
|
||
elif i_type == "FIELD_TYPE_AUTONUMBER":
|
||
va[v] = data[v]["text"] if data[v] else ""
|
||
elif i_type == "FIELD_TYPE_CHECKBOX":
|
||
va[v] = data[v]
|
||
elif i_type == "FIELD_TYPE_REFERENCE":
|
||
va[v] = data[v]
|
||
else:
|
||
print(i_type, data[v], type_info)
|
||
return va
|
||
|
||
@property
|
||
def columns(self):
|
||
if self._columns is None:
|
||
self._columns = self._get_columns()
|
||
return {i["field_title"]: i for i in self._columns}
|
||
|
||
@property
|
||
def data(self):
|
||
if self._data is None:
|
||
self._data = self.search({})
|
||
return self._data
|
||
|
||
def search(self, filters: dict):
|
||
cursor = 0
|
||
result = []
|
||
while True:
|
||
cursor, records = self._get_data(filters, cursor=cursor)
|
||
result += records
|
||
if cursor == 0:
|
||
break
|
||
return result
|
||
|
||
def add(self, data: dict):
|
||
return self._add([{"values": self.transform_data(data)}])
|
||
|
||
def batch_add(self, data_list: list):
|
||
return self._add([{"values": self.transform_data(i)} for i in data_list])
|
||
|
||
def to_json(self):
|
||
data = []
|
||
for i in self.data:
|
||
values = i["values"]
|
||
va = self.convert_data(values)
|
||
va['_rid'] = i["record_id"]
|
||
data.append(va)
|
||
return {"columns": self.columns, "data": data}
|
||
|
||
|
||
class SmartMetadata(SmartSheet):
|
||
def __init__(self, doc_id, sheet_id=None):
|
||
super().__init__(doc_id, sheet_id)
|
||
|
||
def create(self):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/addSheet"
|
||
body = {
|
||
"docid": self.doc_id,
|
||
"properties": {
|
||
"title": "Metadata"
|
||
}
|
||
}
|
||
resp = self._post(url, body)
|
||
if resp.get("properties"):
|
||
self.sheet_id = resp["properties"]["sheet_id"]
|
||
self.add_fields([
|
||
{
|
||
"field_type": "FIELD_TYPE_TEXT",
|
||
"field_title": "名称"
|
||
},
|
||
{
|
||
"property_single_select": {
|
||
"options": [{"style": 13, "text": "doc"},
|
||
{"style": 7, "text": "sheet"},
|
||
{"style": 11, "text": "metadata"}
|
||
],
|
||
"is_multiple": False,
|
||
"is_quick_add": True
|
||
},
|
||
"field_type": "FIELD_TYPE_SINGLE_SELECT",
|
||
"field_title": "类型"
|
||
}, {
|
||
"field_type": "FIELD_TYPE_TEXT",
|
||
"field_title": "值"
|
||
}
|
||
])
|
||
self.delete_field([self.columns[i]["field_id"] for i in self.columns if i not in ["名称", "类型", "值"]])
|
||
self.update(self.sheet_id, sheet_name="Metadata", sheet_type="metadata")
|
||
|
||
def update(self, sheet_id, sheet_name, sheet_type="sheet"):
|
||
self.add({"名称": sheet_name, "类型": sheet_type, "值": sheet_id})
|
||
|
||
|
||
class SmartTableApi:
|
||
def __init__(self, doc_id=None):
|
||
self.doc_id = doc_id
|
||
self._metadata = None
|
||
self.metadata_table = None
|
||
if self.doc_id is not None:
|
||
self._get_sheet_list()
|
||
|
||
@staticmethod
|
||
def _post(url, body):
|
||
resp = requests.post(url, json=body)
|
||
if resp.status_code == 200:
|
||
return resp.json()
|
||
return {}
|
||
|
||
def create(self, doc_name: str, admin_users: list):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/addDocument"
|
||
body = {
|
||
"doc_type": 10,
|
||
"doc_name": doc_name,
|
||
"admin_users": admin_users
|
||
}
|
||
resp = self._post(url, body)
|
||
self.doc_id = resp.get("docid")
|
||
self._get_sheet_list()
|
||
self.metadata_table.update(self.doc_id, sheet_name=doc_name, sheet_type="doc")
|
||
|
||
def _get_sheet_list(self):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/common/submit"
|
||
body = {
|
||
"url": "https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet",
|
||
"method": "POST",
|
||
"body": {
|
||
"docid": self.doc_id
|
||
|
||
}
|
||
}
|
||
resp = self._post(url, body)
|
||
sheet_list = resp.get("sheet_list", [])
|
||
result = {i["title"]: i["sheet_id"] for i in sheet_list}
|
||
if "Metadata" in result:
|
||
metadata_id = result.get("Metadata")
|
||
self.metadata_table = SmartMetadata(self.doc_id, metadata_id)
|
||
del result["Metadata"]
|
||
else:
|
||
self.metadata_table = SmartMetadata(self.doc_id)
|
||
self.metadata_table.create()
|
||
self._metadata = result
|
||
|
||
@property
|
||
def metadata(self):
|
||
return self._metadata
|
||
|
||
def _add_sheet(self, sheet_name):
|
||
url = "https://a.cmdp.cn/umss/v1/wxw/addSheet"
|
||
body = {
|
||
"docid": self.doc_id,
|
||
"properties": {
|
||
"title": sheet_name
|
||
}
|
||
}
|
||
return self._post(url, body)
|
||
|
||
def add_sheet(self, sheet_name):
|
||
resp = self._add_sheet(sheet_name)
|
||
if resp.get("properties"):
|
||
self.metadata_table.update(sheet_id=resp["properties"]["sheet_id"], sheet_name=sheet_name)
|
||
return resp
|
||
|
||
def get_sheet(self, sheet_id):
|
||
return SmartSheet(self.doc_id, sheet_id)
|
||
|
||
def get_sheet_by_name(self, sheet_name):
|
||
if sheet_name in self.metadata:
|
||
return self.get_sheet(self.metadata[sheet_name])
|
||
else:
|
||
return None
|
||
|
||
|
||
class ESmart:
|
||
def __init__(self, time_out: int = 12 * 60 * 60):
|
||
|
||
self.time_out = time_out
|
||
self.timestamp = int(time.time())
|
||
self.report_sheet = SmartSheet(
|
||
"dc48YTUb9gNYCpIWg5v90-U5HIWT72bm3Fkw9Jvi33_sP3I0H32QEDFNOf69TS1bbUjdsYLmvArqqUootD34nm8Q", "L1wFol")
|
||
self.user_sheet = SmartSheet(
|
||
"dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw",
|
||
"8vVxEx")
|
||
self.notice_sheet = SmartSheet(
|
||
"dc1zao6J8YnS9p86FKKvKQadukmFXlXv3dU1qVQrgcjB81zdSVI7qoi9a7K_OpN4BTyXc8EbYVXxpnQNUUTMWwIw", "2943X5")
|
||
self.user_log_sheet = SmartSheet(
|
||
"dcqzSDHPl2ZWoLkEB3Id1wMMaW0meMSwGLjfnuHD0VMh0DVidKdal-3wYfsh7Zb1pGn9moDuzjvhtlRZAXoO-Hjg", "g6dTI2")
|
||
self.base_sheet = SmartSheet(
|
||
"dcqzSDHPl2ZWoLkEB3Id1wMMaW0meMSwGLjfnuHD0VMh0DVidKdal-3wYfsh7Zb1pGn9moDuzjvhtlRZAXoO-Hjg", "1gTulR")
|
||
self._base_data = None
|
||
self._user_info = None
|
||
self._security_mapping = None
|
||
|
||
@staticmethod
|
||
def _convert_user_info(data):
|
||
_data = {}
|
||
for i in data["data"]:
|
||
if "来源对象ID#eid" in i:
|
||
v = {
|
||
"eid": i["来源对象ID#eid"],
|
||
"name": i["名称#name"],
|
||
"value": i["数值#value"],
|
||
"type": i["类型"],
|
||
"_rid": i["_rid"],
|
||
}
|
||
k = v["name"].split("_")
|
||
k = k[0] if len(k) > 0 else ""
|
||
_data[k] = v
|
||
return _data
|
||
|
||
@property
|
||
def now(self):
|
||
now = datetime.datetime.now()
|
||
# 创建一个表示北京时区的对象
|
||
beijing = pytz.timezone('Asia/Shanghai')
|
||
# 将当前时间转换为北京时区
|
||
return now.astimezone(beijing)
|
||
|
||
@property
|
||
def today(self):
|
||
return self.now.strftime("%Y-%m-%d")
|
||
|
||
@property
|
||
def yesterday(self):
|
||
return (self.now - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
|
||
|
||
@property
|
||
def base_data(self):
|
||
if self._base_data is None:
|
||
self._base_data = self.base_sheet.to_json()
|
||
if self._check_timeout():
|
||
self._base_data = self.base_sheet.to_json()
|
||
return self._base_data
|
||
|
||
@property
|
||
def user_info(self):
|
||
if self._user_info is None:
|
||
self._user_info = self.user_sheet.to_json()
|
||
if self._check_timeout():
|
||
self._user_info = self.user_sheet.to_json()
|
||
return self._user_info
|
||
|
||
@property
|
||
def security_mapping(self):
|
||
_data = {}
|
||
if self._security_mapping and self._check_timeout():
|
||
return self._security_mapping
|
||
else:
|
||
u_mapping = self._convert_user_info(self.user_info)
|
||
for i in self.base_data["data"]:
|
||
k = i["深交所上市公司代码"]
|
||
fpic = i.get("跟进负责人", []) # Follow-up person in charge
|
||
v = []
|
||
if fpic:
|
||
for p in fpic:
|
||
pid = u_mapping.get(p, {}).get('_rid')
|
||
if pid:
|
||
v.append(pid)
|
||
_data[k] = v
|
||
self._security_mapping = _data
|
||
return self._security_mapping
|
||
|
||
def _check_timeout(self):
|
||
now = int(time.time())
|
||
if now - self.timestamp > self.time_out:
|
||
print("refresh")
|
||
self.timestamp = now
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def refresh(self):
|
||
self._base_data = self.base_sheet.to_json()
|
||
self._user_info = self.user_sheet.to_json()
|
||
|
||
def generate_message(self, program: str, title: str, subject: str, content: str, tos: list, msg_type: str = "中性",
|
||
top: bool = False):
|
||
data = {
|
||
"*发布日期": f"{self.today}",
|
||
"*栏目#gName": program,
|
||
"状态": "待发布",
|
||
"*摘要#subject": subject,
|
||
"置顶#isTop": top,
|
||
"*详情#content": content,
|
||
"*标题#title": title,
|
||
"消息类型#msgType": msg_type,
|
||
"*消息接收对象#sendList": tos
|
||
}
|
||
return data
|
||
|
||
def using_info_message(self, data):
|
||
code = data.get('公司代码')
|
||
if code:
|
||
name = data.get('公司名称', "")
|
||
product_name = data.get('产品', "")
|
||
using_count = data.get('使用动态口令次数', 0)
|
||
user_name = data.get('用户名称', "")
|
||
using_time = data.get('时间', "")
|
||
using_time = datetime.datetime.fromtimestamp(int(using_time[:-3])) if using_time else using_time
|
||
tos = self.security_mapping.get(code, [])
|
||
return self.generate_message("客户动态",
|
||
f"跟进客户使用产品情况({self.yesterday})",
|
||
"点击可查看您所跟进的上市公司客户的产品动态口令获取记录。",
|
||
f"|证券代码 |公司名称 |产品 |用户|使用时间|动态口令次数 |\n|:--|\n|{code} |{name} |{product_name} |{user_name}|{using_time}| {using_count} |\n",
|
||
tos)
|
||
return {}
|
||
|
||
def report_info_message(self, data):
|
||
code = data.get('代码')
|
||
if code:
|
||
name = data.get('简称', "")
|
||
title = data.get('公告标题', "")
|
||
report_type = data.get('公告类型', "")
|
||
report_time = data.get('公告时间', "")
|
||
report_uri = data.get('公告链接', "")
|
||
tos = self.security_mapping.get(code, [])
|
||
return self.generate_message("客户动态",
|
||
f"跟进客户的公告披露信息({self.yesterday})",
|
||
"点击可查看您所跟进的上市公司客户的公告披露信息。",
|
||
f'|证券代码 |证券简称 |公告标题 |公告类型 |公告时间 |公告链接 |\n|:--|\n|{code} |{name} |{title} |{report_type} |{report_time}|<a data-href="{report_uri}">{report_uri}</a> | \n',
|
||
tos)
|
||
return {}
|
||
|
||
def prepared_message(self):
|
||
count = 0
|
||
messages = []
|
||
for i in self.report_sheet.search({
|
||
"conjunction": "CONJUNCTION_AND",
|
||
"conditions": [{
|
||
"field_id": "fjVQAh",
|
||
"field_type": "FIELD_TYPE_CREATED_TIME",
|
||
"operator": "OPERATOR_IS",
|
||
"date_time_value": {
|
||
"type": "DATE_TIME_TYPE_YESTERDAY"
|
||
}
|
||
}]
|
||
}):
|
||
data = self.report_sheet.convert_data(i["values"])
|
||
message = self.report_info_message(data)
|
||
if message:
|
||
count += 1
|
||
messages.append(message)
|
||
for i in self.user_log_sheet.search({
|
||
"conjunction": "CONJUNCTION_AND",
|
||
"conditions": [{
|
||
"field_id": "fvJi8f",
|
||
"field_type": "FIELD_TYPE_CREATED_TIME",
|
||
"operator": "OPERATOR_IS",
|
||
"date_time_value": {
|
||
"type": "DATE_TIME_TYPE_YESTERDAY"
|
||
}
|
||
}]
|
||
}):
|
||
data = self.user_log_sheet.convert_data(i["values"])
|
||
message = self.using_info_message(data)
|
||
if message:
|
||
count += 1
|
||
messages.append(message)
|
||
self.notice_sheet.batch_add(messages)
|
||
push_message("推送消息通知", f"今天是{self.today} 本次预计计推送{count}条消息", "")
|
||
|
||
@staticmethod
|
||
def push():
|
||
resp = requests.get("https://cloud.cmdp.cn/wapi/recommend_message_push")
|
||
if resp.status_code == 200:
|
||
return resp.json()
|
||
return {}
|