Files
Esmart/lib.py
lychang 37af6c33ee init
2025-09-16 01:54:46 +08:00

517 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# !/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import requests
class WXworkAuth:
def __init__(self, app_id, app_key):
self.crop_id = app_id
self.crop_secret = app_key
self.access_token = None
self.timestamp = None
def _get_access_token(self):
self.timestamp = int(time.time())
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.crop_id}&corpsecret={self.crop_secret}"
response = requests.get(url)
access_info = response.json()
if 'access_token' in access_info:
self.access_token = access_info['access_token']
else:
raise Exception('获取access_token失败')
def get_access_token(self):
_now = int(time.time())
if not self.access_token or (_now - self.timestamp) > 7200:
self.refresh_token()
return self.access_token
def refresh_token(self):
try:
self._get_access_token()
except Exception as e:
print(e, '重新获取')
self._get_access_token()
class CloudFunctionAPI:
def __init__(self, base_url):
self.base_url = base_url
def get_user_info_by_id(self, user_id: str) -> dict:
url = f"{self.base_url}/in/api/user"
body = {
"uid": user_id
}
response = requests.get(url, params=body)
if response.status_code != 200:
return {}
user_info = response.json().get('data', {})
return user_info
def get_user_info_by_sec(self, security_code: str) -> list:
url = f"{self.base_url}/in/api/userQuery"
body = {
"secCode": security_code,
"status": 10,
"PageSize": 10,
"PageNo": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return []
user_info = response.json().get("list")
if user_info is None:
user_info = [{}]
return user_info
def get_user_info_by_name(self, user_name: str) -> dict:
url = f"{self.base_url}/in/api/userQuery"
body = {
"loginName": user_name,
"status": 10,
"PageSize": 10,
"PageNo": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json().get("list")
if user_info is None:
user_info = {}
else:
user_info = user_info[0]
return user_info
@staticmethod
def parse_tags(tags: list):
def _parse_tag(_tag: str):
return _tag.split("_")
temp_dict = {}
for tag in tags:
tag_split = _parse_tag(tag)
size = len(tag_split)
cursor = None
for idx, i in enumerate(tag_split):
if idx == 0:
cursor = temp_dict
val = {} if idx < size - 1 else True
cursor[i] = val
if isinstance(val,bool):
cursor = None
else:
cursor = cursor[i]
print(temp_dict)
return temp_dict
def get_user_tags(self, user_id: str = "", user_name: str = "") -> dict:
if user_id:
user_info = self.get_user_info_by_id(user_id)
elif user_name:
user_info = self.get_user_info_by_name(user_name)
else:
return {}
tags = user_info.get('tags', [])
return self.parse_tags(tags)
def update_user_tag(self, user_id: str, key: str, value: str):
url = f"{self.base_url}/in/api/userTags"
body = {
"userId": user_id,
"tags": [f"{key}_{value}"]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json()
return user_info
def update_user_tag_by_name(self, user_name: str, key: str, value: str):
user_info = self.get_user_info_by_name(user_name)
user_id = user_info.get('id')
if not user_id:
return {}
return self.update_user_tag(user_id, key, value)
def update_user_tags(self, user_id: str, tags: dict):
url = f"{self.base_url}/in/api/userTags"
body = {
"userId": user_id,
"tags": [f"{key}_{value}" for key, value in tags.items()]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
user_info = response.json()
return user_info
class WXworkBaseAPI:
def __init__(self, auth: WXworkAuth):
self.auth = auth
@property
def access_token(self):
return self.auth.get_access_token()
class WXworkApprovalAPI(WXworkBaseAPI):
def get_approval_list(self, template_id: str, start_time: int, end_time: int) -> list:
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovalinfo?access_token={self.access_token}"
body = {
"starttime": str(start_time),
"endtime": str(end_time),
"new_cursor": "",
"size": 100,
"filters": [
{
"key": "template_id",
"value": template_id
},
{
"key": "sp_status",
"value": "2"
}
]
}
response = requests.post(url, json=body)
if response.status_code != 200:
return []
else:
approval_info = response.json()
return approval_info["sp_no_list"]
def get_approval_info(self, approval_id: str) -> dict:
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/getapprovaldetail?access_token={self.access_token}"
body = {
"sp_no": approval_id
}
response = requests.post(url, json=body)
approval_info = response.json()
return approval_info
def get_approval_list_weekly(self, template_id) -> list:
current_time = time.time()
# 获取当前时间的结构体
current_local_time = time.localtime(current_time)
# 星期几0=星期一6=星期天)
weekday = current_local_time.tm_wday
# 计算下一个星期天的天数
days_until_sunday = (6 - weekday) % 7
this_weekday = int(current_time + (days_until_sunday * 86400))
last_weekday = int(this_weekday - 7 * 24 * 3600)
return self.get_approval_list(template_id, last_weekday, this_weekday)
class WXworkCalendarAPI(WXworkBaseAPI):
def create_calendar(self, calendar_name, userid):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/add?access_token={self.access_token}"
body = {
"calendar": {
"admins": [
userid
],
"set_as_default": 1,
"summary": calendar_name,
"color": "#FF3030",
"description": "访问日历",
"is_public": 0,
"is_corp_calendar": 0,
"shares": [
{
"userid": userid,
"permission": 1
},
],
}
}
response = requests.post(url, json=body)
cinfo = response.json()
if "cal_id" in cinfo:
return cinfo["cal_id"]
else:
return cinfo
def delete_calendar(self, calendar_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/calendar/del?access_token={self.access_token}"
body = {
"cal_id": calendar_id
}
response = requests.post(url, json=body)
calendar_info = response.json()
return calendar_info
def get_calendar_list(self, calendar_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get_by_calendar?access_token={self.access_token}"
body = {
"cal_id": calendar_id,
"offset": 0,
"limit": 1000
}
response = requests.post(url, json=body)
calendar_list = response.json()
return calendar_list
def create_schedule(self, calendar_id, schedule_name, start_time, end_time, user_id, content, address=""):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/add?access_token={self.access_token}"
if start_time == end_time:
end_time = str(int(start_time) + 1)
body = {
"schedule": {
"admins": [
user_id
],
"start_time": start_time,
"end_time": end_time,
"is_whole_day": 0,
"attendees": [{
"userid": user_id
}],
"summary": schedule_name,
"description": content,
"reminders": {
"is_remind": 1,
"remind_before_event_secs": 3600,
"timezone": 8
},
"location": address,
"cal_id": calendar_id
}
}
response = requests.post(url, json=body)
schedule_info = response.json()
return schedule_info
def get_schedule(self, schedule_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/get?access_token={self.access_token}"
body = {
"schedule_id_list": [
schedule_id
]
}
response = requests.post(url, json=body)
schedule_list = response.json()
return schedule_list
def delete_schedule(self, schedule_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/oa/schedule/del?access_token={self.access_token}"
body = {
"schedule_id": schedule_id
}
response = requests.post(url, json=body)
schedule_info = response.json()
return schedule_info
class WXworkUserInfoAPI(WXworkBaseAPI):
def get_user_info(self, user_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={self.access_token}&userid={user_id}"
response = requests.get(url)
user_info = response.json()
return user_info
class WXworkDocAPI:
def __init__(self, auth:WXworkAuth,space_id):
self.space_id = space_id
self.auth = auth
self.access_token = self.auth.get_access_token()
def create_space(self, space_name):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_create?access_token={self.access_token}"
body = {
"space_name": space_name,
"auth_info": [{
"type": 1,
"userid": "lychang",
"auth": 7
}],
"space_sub_type": 0
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_space_info(self):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/space_info?access_token={self.access_token}"
body = {
"spaceid": self.space_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_space_file_list(self):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedrive/file_list?access_token={self.access_token}"
body = {
"spaceid": self.space_id,
"fatherid": self.space_id,
"sort_type": 1,
"start": 0,
"limit": 100
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_document_info(self,doc_id):
self.doc_id = doc_id
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/document/get?access_token={self.access_token}"
body = {
"docid": self.doc_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_table_info(self,doc_id, sheet_id, view_ids: list):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_views?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"view_ids": view_ids,
"offset": 0,
"limit": 1
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_table_data(self, doc_id, sheet_id, view_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_records?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"view_id": view_id,
"record_ids": [],
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"field_titles": [],
"field_ids": [],
"sort": [],
"offset": 0,
"limit": 100
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def get_sheet_data(self, doc_id):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/get_sheet?access_token={self.access_token}"
body = {
"docid": doc_id
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def create_table(self, doc_name, user_ids: list, father_id=None):
"""
doc_type 文档类型, 3:文档 4:表格 10:智能表格
"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/create_doc?access_token={self.access_token}"
body = {
"doc_type": 10,
"doc_name": doc_name,
"admin_users": user_ids
}
if self.space_id:
body["spaceid"] = self.space_id
if father_id:
body["fatherid"] = father_id
else:
body["fatherid"] = self.space_id
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def add_row(self,doc_id,sheet_id,records:list):
url = f"https://qyapi.weixin.qq.com/cgi-bin/wedoc/smartsheet/add_records?access_token={self.access_token}"
body = {
"docid": doc_id,
"sheet_id": sheet_id,
"key_type": "CELL_VALUE_KEY_TYPE_FIELD_TITLE",
"records": records
}
response = requests.post(url, json=body)
if response.status_code != 200:
return {}
else:
approval_info = response.json()
return approval_info
def parse_data(data: dict, mapping: dict) -> dict:
user_id = data.get("applyer").get("userid")
app_data = data.get("apply_data").get("contents")
_info = {}
for i in app_data:
key = i.get("title")[0]["text"]
cp_id = i.get("id")
control = i.get("control")
value = i.get("value")
if control == "Text":
result = value.get("text")
elif control == "Number":
result = value.get("new_number")
elif control == "Date":
result = value.get("date").get("s_timestamp")
elif control == "Selector":
_t = value.get("selector").get("type")
if _t == "multi":
opts = value.get("selector").get("options",[])
result = [j["key"] for j in opts]
else:
opts = i.get("selector").get("options", [])
result = opts[0]["key"]
else:
result = ""
_info[cp_id] = {"key": key, "value": result}
_r = {v: _info.get(k, {}).get("value") for k, v in mapping.items()}
_r["_uid"] = user_id
return _r