bigdata-highway-analytics/App/apis.py
18796357645 11657ae4f5 上传
2025-03-31 20:06:02 +08:00

415 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import requests
from flask import jsonify, request
from flask_restful import Resource, marshal_with, fields, reqparse
from App.exts import db
from App.models import User, CityCode, jam
from App.爬虫 import delayed, mock
from sqlalchemy import text
from flask import jsonify
from flask_restful import Resource
user_fields = {
'id': fields.Integer,
'nick_name': fields.String,
'username': fields.String,
'password': fields.String,
}
login_fields = {
'status': fields.Integer,
'msg': fields.String,
'data': fields.Nested(user_fields),
}
void_fields = {
'status': fields.Integer,
'msg': fields.String,
}
class loginResource(Resource):
@marshal_with(void_fields)
def post(self):
# 获取请求中的用户名和密码
data = request.get_json()
username = data['username']
password = data['password']
# 根据条件查询用户,例如根据用户名查询
is_username = User.query.filter_by(username=username).first()
is_username_password = User.query.filter_by(username=username,password=password).first()
if is_username is None:
return {
'status': 400,
'msg': '账号不正确或不存在',
}
elif is_username_password is None:
return {
'status': 400,
'msg': '密码不正确',
}
elif is_username_password is not None:
return {
'status': 200,
'msg': '登录成功',
}
else:
return {
'status': 400,
'msg': '异常登录,联系管理员',
}
# 注册
class signResource(Resource):
@marshal_with(void_fields)
def post(self):
# 解析请求参数
parser = reqparse.RequestParser()
parser.add_argument('username', type=str, required=True, help='Username is required')
parser.add_argument('password', type=str, required=True, help='Password is required')
args = parser.parse_args()
# 检查用户名和邮箱是否已经存在
existing_user = User.query.filter_by(username=args['username']).first()
if existing_user:
return {'status': 400,'msg': '用户名已经存在'}
else:
# 创建新用户
new_user = User(username=args['username'],password=args['password'])
# 保存到数据库
db.session.add(new_user)
db.session.commit()
return {
'status': 200,
'msg': '注册成功',
}
# 城市列表/////////////////////////////////////////////////
class cityCode(Resource):
def post(self):
# 获取请求中的用户名和密码
data = request.get_json()
pagesize = data['pagesize']
page = data['page']
print(pagesize, page)
pagesize = pagesize
offset = pagesize * (page - 1)
code_list = []
codes = CityCode.query.offset(offset).limit(pagesize).all() # 分页查询语法
for s in codes:
dic = {}
dic['code'] = s.code
dic['name'] = s.name
dic['pinyin'] = s.pinyin
code_list.append(dic)
return code_list
class jamApi(Resource):
def post(self):
# 获取请求中的用户名和密码
data = request.get_json()
pagesize = data['pagesize']
page = data['page']
pagesize = pagesize
offset = pagesize * (page - 1)
code_list = []
codes = jam.query.offset(offset).limit(pagesize).all() # 分页查询语法
for s in codes:
dic = {}
dic['id'] = s.id
dic['freeFlowSpeed'] = s.freeFlowSpeed
dic['realSpeed'] = s.realSpeed
dic['idxRatio'] = s.idxRatio
dic['idxRatioState'] = s.idxRatioState
dic['idx'] = s.idx
dic['label'] = s.label
dic['name'] = s.name
dic['create_time'] = str(s.create_time)
code_list.append(dic)
return code_list
class acquisitionApi(Resource):
def get(self):
delayed()
return {'code':200}
class forecast(Resource):
def get(self):
mock()
import pymysql
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
# 连接MySQL数据库
conn = pymysql.connect(host='localhost', user='root', password='123456', db='bs_jaotong')
# 创建cursor对象
cur = conn.cursor()
# 执行SQL查询
cur.execute("SELECT create_time as name, round(avg(`idx`), 2) as value FROM prediction GROUP BY create_time")
# 获取查询结果
rows = cur.fetchall()
# 获取列名
columns = [desc[0] for desc in cur.description]
# 将数据转换为DataFrame
df = pd.DataFrame(rows, columns=columns)
# 确定最后日期并计算未来五天日期
last_date = pd.to_datetime(df['name'].iloc[-1])
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=5, freq='D')
future_dates_str = future_dates.strftime('%Y-%m-%d')
# 创建包含未来五天日期的新DataFrame
future_df = pd.DataFrame({'name': future_dates_str})
# 合并原始DataFrame和未来日期的DataFrame
merged_df = pd.concat([df, future_df])
# 拟合ARIMA模型
model = ARIMA(merged_df['value'], order=(1, 1, 1))
model_fit = model.fit()
# 预测未来五天数据
forecast = model_fit.forecast(steps=5)
# 提取预测结果中未来五天的数据
future_forecast = pd.DataFrame({'name': future_dates_str, 'value': forecast})
# 关闭cursor和连接
cur.close()
conn.close()
return jsonify( future_forecast.values.tolist())
# 用户列表/////////////////////////////////////////////////
class userPage(Resource):
def get(self):
result = db.session.execute("select * from sys_user ")
# 将查询结果转换为字典
data = [{column: value for column, value in row.items()} for row in result]
return jsonify(data)
class delUser(Resource):
def post(self):
try:
data = request.get_json()
user_id = data['id']
user = User.query.get(user_id)
print(user.username)
if user:
db.session.delete(user)
db.session.commit()
print("删除")
return {'code': 200,'msg': '删除成功', }
except:
return {'code': 400, 'msg': '删除失败,可能值不存在', }
class Top1(Resource):
def get(self):
try:
result = db.session.execute(text("SELECT * FROM jam LIMIT 1"))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class CongestionCityOrder(Resource):
def get(self):
try:
result = db.session.execute(text(
"SELECT label, freeFlowSpeed, realSpeed FROM jam "
"ORDER BY freeFlowSpeed, realSpeed DESC LIMIT 10"
))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class CongestionCityOrderList(Resource):
def get(self):
try:
result = db.session.execute(text("SELECT * FROM jam LIMIT 10"))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class rankList(Resource):
def get(self):
try:
result = db.session.execute(text("SELECT * FROM rank LIMIT 10"))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class rankListView(Resource):
def get(self):
try:
result = db.session.execute(text(
"SELECT * FROM rank ORDER BY idx DESC LIMIT 10"
))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class rankListViewPie(Resource):
def get(self):
try:
result = db.session.execute(text(
"SELECT healState as name, count(*) as value "
"FROM rank GROUP BY healState ORDER BY name"
))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class headthView(Resource):
def get(self):
try:
result = db.session.execute(text(
"SELECT indicator as name, round(avg, 2) as value "
"FROM countryindicators"
))
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class getTimeHeath(Resource):
def get(self):
# 使用 text() 包装 SQL 语句
sql = text("SELECT DISTINCT code FROM city_code")
results = db.session.execute(sql)
# 获取结果的方式取决于你的数据库驱动
# 方法1如果使用 SQLAlchemy Core
code_list = [row[0] for row in results] # 使用索引访问第一列
if not code_list:
return jsonify({"error": "No data found"}), 404
random_element = random.choice(code_list)
# 使用参数化查询防止 SQL 注入
query_sql = text("""
SELECT name, res, DATE_FORMAT(FROM_UNIXTIME(inert_time / 1000), '%Y-%d-%y') as time
FROM city_week
WHERE code = :code
""")
result = db.session.execute(query_sql, {'code': int(random_element)})
# 获取列名
columns = result.keys()
# 构建结果字典
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
class getRoadHeath(Resource):
def get(self):
try:
# 使用text()包装SQL并参数化查询
# 获取不重复的code列表
distinct_codes = db.session.execute(
text("SELECT DISTINCT code FROM city_code")
)
# 安全处理结果 - 方法1适用于大多数驱动
code_list = [row[0] for row in distinct_codes] # 使用索引访问第一列
# 或者方法2如果驱动支持:
# code_list = [row.code for row in distinct_codes]
if not code_list:
return jsonify({"error": "No city codes found"}), 404
# 随机选择一个code
random_code = random.choice(code_list)
# 使用参数化查询防止SQL注入
road_data = db.session.execute(
text("SELECT * FROM city_road WHERE code = :code"),
{"code": int(random_code)}
)
# 获取列名并构建结果字典
columns = road_data.keys()
data = [dict(zip(columns, row)) for row in road_data]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
#城市拥堵指数占比分析
class cityListViewPie(Resource):
def get(self):
try:
# 使用text()包装SQL查询
result = db.session.execute(
text("""
SELECT name, avg(speed) as value
FROM city_road
GROUP BY name
LIMIT 20
""")
)
# 获取列名并构建结果字典
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class dataCount(Resource):
def get(self):
try:
# 使用text()包装SQL查询
result = db.session.execute(
text("""
SELECT sum(a.Rows) as dataCount
FROM (
SELECT TABLE_NAME AS `Table`, TABLE_ROWS AS `Rows`
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'bs_jaotong'
) a
""")
)
# 获取列名并构建结果字典
columns = result.keys()
data = [dict(zip(columns, row)) for row in result]
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
class jiashiData(Resource):
def get(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Cookie': 'cna=jGJrHgCNgXIBASQOA6Fypcve; xlly_s=1; isg=BOnpxCMwqPzI-ZRIjwWwhSkd-JVDtt3oq4XyOovfz1BuUgxk0wQ-uRKLEPbkSnUg; tfstk=ekjvvAV7O7VmQWheroUl7K5BUCeuxiBqysWIjCAm5_CRMse2iCxcWfdR6E22jOOO6_APcIOMSh3O9sI0hh7q3a5hdmj0uhm90h-_tWq3x-W2bhMKMo6C3Z9LocV3xkXblK-GFW24T1Odmy0b0xancisMhyR6zF010YzBlB6XGtXcVoTmuTdRBOjXHTKfhaisfgpvkgzGxMIM2XApIqw8eVuwlLu9DgxbAl9rvLd3Hl3Z7UyyeB28wVuwlJJJt8fr7V8ly; user_unique_id=a1ac68b88d722bb9018e0cfc03e45d88; SESSION=074d5eba-96c3-4288-b3dc-3175a766ccdf'
}
req = requests.get(url="https://report.amap.com/sanjiyisu/roads.do?type=0", headers=headers)
data = req.json()
return jsonify(data)