bigdata-highway-analytics/App/爬虫.py
18796357645 11657ae4f5 上传
2025-03-31 20:06:02 +08:00

193 lines
7.0 KiB
Python

import requests
from tqdm import tqdm
import pymysql
# 连接到数据库
connection = pymysql.connect(host='localhost',
user='root',
password='123456',
cursorclass=pymysql.cursors.DictCursor, # 设置返回结果为字典类型
database='bs_jaotong')
cursor = connection.cursor()
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Cookie':'cna=jGJrHgCNgXIBASQOA6Fypcve; xlly_s=1; isg=BOnpxCMwqPzI-ZRIjwWwhSkd-JVDtt3oq4XyOovfz1BuUgxk0wQ-uRKLEPbkSnUg; tfstk=ekjvvAV7O7VmQWheroUl7K5BUCeuxiBqysWIjCAm5_CRMse2iCxcWfdR6E22jOOO6_APcIOMSh3O9sI0hh7q3a5hdmj0uhm90h-_tWq3x-W2bhMKMo6C3Z9LocV3xkXblK-GFW24T1Odmy0b0xancisMhyR6zF010YzBlB6XGtXcVoTmuTdRBOjXHTKfhaisfgpvkgzGxMIM2XApIqw8eVuwlLu9DgxbAl9rvLd3Hl3Z7UyyeB28wVuwlJJJt8fr7V8ly; user_unique_id=a1ac68b88d722bb9018e0cfc03e45d88; SESSION=074d5eba-96c3-4288-b3dc-3175a766ccdf'
}
def rank(url):
req = requests.get(url=url,headers=headers)
data = req.json()
# 构建元组列表
rankdata = [(item.get('rank'), item.get('adcode'), item.get('cityName'), item.get('idx'),
item.get('freeFlowSpeed'), item.get('realSpeed'), item.get('idx1'), item.get('ratio1w'),
item.get('healthValue'), item.get('healState')) for item in tqdm(data)]
# 准备SQL查询
sql = "INSERT INTO rank (rank, adcode, cityName,idx,freeFlowSpeed,realSpeed,idx1,ratio1w,healthValue,healState) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursor.executemany(sql, rankdata)
def countryindicators(url):
req = requests.get(url=url, headers=headers)
data = req.json()
# 构建元组列表
countryindicatorsdata = [(item.get('avg'), item.get('indicator'), item.get('maxValue'), item.get('numGTAvg'),
item.get('topCityName')) for item in tqdm(data)]
# 准备SQL查询
sql = "INSERT INTO countryindicators (`avg`, `indicator`, `maxValue`,`numGTAvg`,`topCityName`) VALUES (%s, %s, %s, %s, %s)"
cursor.executemany(sql, countryindicatorsdata)
'''用户延时排行榜'''
def delayed():
req = requests.get(url="https://report.amap.com/ajax/getCityRank.do",headers=headers)
data = req.json()
# 构建元组列表
rankdata = [
(item.get('freeFlowSpeed'), # 畅通速度
item.get('realSpeed'), # 平均速度
item.get('idxRatio'), # 周环比数
item.get('idxRatioState'), # 周环比状态
item.get('idx'), # 拥堵延时指数
item.get('label'), # 城市
item.get('name') #编号
) for item in tqdm(data)]
print(rankdata)
clear("jam")
# 准备SQL查询
sql = "INSERT INTO jam (freeFlowSpeed, realSpeed,idxRatio,idxRatioState,idx,label,`name`) VALUES (%s, %s, %s, %s, %s, %s, %s)"
cursor.executemany(sql, rankdata)
connection.commit()
def clear(table):
# 删除表中的所有数据(保留表结构)
delete_query = f"TRUNCATE TABLE {table}"
cursor.execute(delete_query)
# 确保提交更改
connection.commit()
def getCityCode():
req = requests.get(url="https://report.amap.com/ajax/getCityInfo.do?", headers=headers)
data = req.json()
# 构建元组列表
rankdata = [
(item.get('code'), # 代码
item.get('name'), # 城市
item.get('pinyin'), # 拼音
) for item in tqdm(data)]
clear("city_code")
# 准备SQL查询
sql = "INSERT INTO city_code (code, name,pinyin) VALUES (%s, %s, %s)"
cursor.executemany(sql, rankdata)
connection.commit()
def getCityHeadth():
sql = "SELECT code,name FROM city_code "
cursor.execute(sql)
# 获取查询结果
results = cursor.fetchall()
# 将结果转换为列表
data_list = []
for row in tqdm(results):
code = row['code']
name = row['name']
url = f"https://report.amap.com/ajax/cityDaily.do?cityCode={code}&dataType=2"
req = requests.get(url=url, headers=headers)
data = req.json()
# 构建元组列表
for item in data:
sql = "INSERT INTO city_week (code, `name`,res,inert_time) VALUES (%s, %s, %s, %s)"
# 定义要插入的数据
data_to_insert = (code,name,item[1],item[0]) #
print(data_to_insert)
# 执行插入操作
cursor.execute(sql, data_to_insert)
# 提交事务
connection.commit()
# 道路爬取
def daolu():
sql = "SELECT code,name FROM city_code "
cursor.execute(sql)
# 获取查询结果
results = cursor.fetchall()
# 将结果转换为列表
for row in tqdm(results):
code = row['code']
name = row['name']
url = f"https://report.amap.com/ajax/roadRank.do?roadType=0&timeType=0&cityCode={code}"
req = requests.get(url=url, headers=headers)
data = req.json()
# 构建元组列表
for item in data.get('tableData'):
print(item.get("name"))
sql = "INSERT INTO city_road (code, `name`,road_name,`index`,speed,travelTime,delayTime) VALUES (%s, %s, %s, %s, %s, %s, %s)"
# 定义要插入的数据
data_to_insert = (code, name, item.get("name"),item.get("index"), item.get("speed"), item.get("travelTime"), item.get("delayTime")) #
print(data_to_insert)
# 执行插入操作
cursor.execute(sql, data_to_insert)
# 提交事务
connection.commit()
import random
from datetime import datetime, timedelta
def mock():
clear("prediction")
# 生成随机数据的起始时间:当前时间的前两个月
start_date = datetime.now() - timedelta(days=60)
# 生成随机数据的结束时间:当前时间
end_date = datetime.now()
# 遍历时间范围,每天生成三条数据
data = []
while start_date < end_date:
# 生成三条数据
for _ in range(3):
# 生成随机的 idx
idx = round(random.uniform(0.55, 4), 2)
# 格式化时间为 yyyy-mm-dd
create_time = start_date.strftime('%Y-%m-%d')
# 将数据添加到列表中
data.append({'idx': idx, 'create_time': create_time})
# 增加一天
start_date += timedelta(days=1)
# 打印生成的数据
for item in data:
sql = "INSERT INTO prediction (idx, `create_time`) VALUES (%s, %s)"
# 定义要插入的数据
data_to_insert = (item.get("idx"), item.get("create_time")) #
# 执行插入操作
cursor.execute(sql, data_to_insert)
# 提交事务
connection.commit()
if __name__ == '__main__':
# 用户延时排行榜
# delayed()
# 获取城市代码
# getCityCode()
# daolu()
mock()
# rank('https://report.amap.com/diagnosis/rank.do')
# countryindicators('https://report.amap.com/diagnosis/ajax/countryindicators.do')
connection.commit()
cursor.close()
connection.close()