Files
chat-bot/function/weather.py
lychang 64ce30fdfd init
2025-08-26 09:35:29 +08:00

84 lines
2.8 KiB
Python

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Filename: weather.py
# @Author: lychang
# @Time: 7/5/2023 5:53 PM
import re
import json
import requests
from fuzzywuzzy import process
from core.types import BaseTool
from extension.standard import resource_manager
class CityMatcher:
def __init__(self):
self.data = json.loads(resource_manager.get("json", "city.json").decode('utf-8'))
self.area_map = self._build_area_map()
def _build_area_map(self):
area_map = {}
for province, cities in self.data.items():
for city, districts in cities.items():
for district, info in districts.items():
area_map[info['NAMECN']] = info['AREAID']
return area_map
def find_area_id(self, area_name, threshold=80):
match = process.extract(area_name, self.area_map.keys(), limit=5)
return [self.area_map[i[0]] for i in match if i[1] >= threshold]
matcher = CityMatcher()
class WeatherSearch(BaseTool):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.0.0"}
def __init__(self):
name = "weather_search"
description = "在提及天气时,对天气数据进行查询"
super(WeatherSearch, self).__init__(name, description)
self.execute = self.search
@staticmethod
def _get_city_code(message: str):
area_id = matcher.find_area_id(message)
if area_id:
return area_id
else:
return ["101030100"]
@staticmethod
def _get_sub_task(message: str):
return message.split("sub_task:")[-1] if "sub_task:" in message else message
def _get_city_info(self, city_code: str):
url = f"http://www.weather.com.cn/weather/{city_code}.shtml"
resp = requests.get(url, headers=self.headers)
resp.encoding = "utf-8"
weather_html = re.findall('<ul class="t clearfix">(.*?)</ul>', resp.text, re.S)[0].replace("\n", "")
weather_info = re.findall(
'<h1>(.*?)</h1>.*?<p.*?>(.*?)</p><p class="tem">(.*?)</p><p '
'class="win">',
weather_html)
result = city_code + "\n"
for wea in weather_info:
result += " ".join(wea) + "\n"
result = result.replace("</span>", "<span>").replace("<span>", "")
result = result.replace("</i>", "<i>").replace("<i>", "")
return result
def search(self, message: str):
city_info = ""
sub_task = self._get_sub_task(message)
for city_code in self._get_city_code(sub_task):
city_info += self._get_city_info(city_code)
return self.normal(city_info)
weather_search = WeatherSearch()