import random import requests from flask import jsonify, request from flask_restful import Resource, marshal_with, fields, reqparse from App.exts import db from App.models import User, CityCode, jam from App.爬虫 import delayed, mock from sqlalchemy import text from flask import jsonify from flask_restful import Resource user_fields = { 'id': fields.Integer, 'nick_name': fields.String, 'username': fields.String, 'password': fields.String, } login_fields = { 'status': fields.Integer, 'msg': fields.String, 'data': fields.Nested(user_fields), } void_fields = { 'status': fields.Integer, 'msg': fields.String, } class loginResource(Resource): @marshal_with(void_fields) def post(self): # 获取请求中的用户名和密码 data = request.get_json() username = data['username'] password = data['password'] # 根据条件查询用户,例如根据用户名查询 is_username = User.query.filter_by(username=username).first() is_username_password = User.query.filter_by(username=username,password=password).first() if is_username is None: return { 'status': 400, 'msg': '账号不正确或不存在', } elif is_username_password is None: return { 'status': 400, 'msg': '密码不正确', } elif is_username_password is not None: return { 'status': 200, 'msg': '登录成功', } else: return { 'status': 400, 'msg': '异常登录,联系管理员', } # 注册 class signResource(Resource): @marshal_with(void_fields) def post(self): # 解析请求参数 parser = reqparse.RequestParser() parser.add_argument('username', type=str, required=True, help='Username is required') parser.add_argument('password', type=str, required=True, help='Password is required') args = parser.parse_args() # 检查用户名和邮箱是否已经存在 existing_user = User.query.filter_by(username=args['username']).first() if existing_user: return {'status': 400,'msg': '用户名已经存在'} else: # 创建新用户 new_user = User(username=args['username'],password=args['password']) # 保存到数据库 db.session.add(new_user) db.session.commit() return { 'status': 200, 'msg': '注册成功', } # 城市列表///////////////////////////////////////////////// class cityCode(Resource): def post(self): # 获取请求中的用户名和密码 data = request.get_json() pagesize = data['pagesize'] page = data['page'] print(pagesize, page) pagesize = pagesize offset = pagesize * (page - 1) code_list = [] codes = CityCode.query.offset(offset).limit(pagesize).all() # 分页查询语法 for s in codes: dic = {} dic['code'] = s.code dic['name'] = s.name dic['pinyin'] = s.pinyin code_list.append(dic) return code_list class jamApi(Resource): def post(self): # 获取请求中的用户名和密码 data = request.get_json() pagesize = data['pagesize'] page = data['page'] pagesize = pagesize offset = pagesize * (page - 1) code_list = [] codes = jam.query.offset(offset).limit(pagesize).all() # 分页查询语法 for s in codes: dic = {} dic['id'] = s.id dic['freeFlowSpeed'] = s.freeFlowSpeed dic['realSpeed'] = s.realSpeed dic['idxRatio'] = s.idxRatio dic['idxRatioState'] = s.idxRatioState dic['idx'] = s.idx dic['label'] = s.label dic['name'] = s.name dic['create_time'] = str(s.create_time) code_list.append(dic) return code_list class acquisitionApi(Resource): def get(self): delayed() return {'code':200} class forecast(Resource): def get(self): mock() import pymysql import pandas as pd from statsmodels.tsa.arima.model import ARIMA # 连接MySQL数据库 conn = pymysql.connect(host='localhost', user='root', password='123456', db='bs_jaotong') # 创建cursor对象 cur = conn.cursor() # 执行SQL查询 cur.execute("SELECT create_time as name, round(avg(`idx`), 2) as value FROM prediction GROUP BY create_time") # 获取查询结果 rows = cur.fetchall() # 获取列名 columns = [desc[0] for desc in cur.description] # 将数据转换为DataFrame df = pd.DataFrame(rows, columns=columns) # 确定最后日期并计算未来五天日期 last_date = pd.to_datetime(df['name'].iloc[-1]) future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=5, freq='D') future_dates_str = future_dates.strftime('%Y-%m-%d') # 创建包含未来五天日期的新DataFrame future_df = pd.DataFrame({'name': future_dates_str}) # 合并原始DataFrame和未来日期的DataFrame merged_df = pd.concat([df, future_df]) # 拟合ARIMA模型 model = ARIMA(merged_df['value'], order=(1, 1, 1)) model_fit = model.fit() # 预测未来五天数据 forecast = model_fit.forecast(steps=5) # 提取预测结果中未来五天的数据 future_forecast = pd.DataFrame({'name': future_dates_str, 'value': forecast}) # 关闭cursor和连接 cur.close() conn.close() return jsonify( future_forecast.values.tolist()) # 用户列表///////////////////////////////////////////////// class userPage(Resource): def get(self): result = db.session.execute("select * from sys_user ") # 将查询结果转换为字典 data = [{column: value for column, value in row.items()} for row in result] return jsonify(data) class delUser(Resource): def post(self): try: data = request.get_json() user_id = data['id'] user = User.query.get(user_id) print(user.username) if user: db.session.delete(user) db.session.commit() print("删除") return {'code': 200,'msg': '删除成功', } except: return {'code': 400, 'msg': '删除失败,可能值不存在', } class Top1(Resource): def get(self): try: result = db.session.execute(text("SELECT * FROM jam LIMIT 1")) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class CongestionCityOrder(Resource): def get(self): try: result = db.session.execute(text( "SELECT label, freeFlowSpeed, realSpeed FROM jam " "ORDER BY freeFlowSpeed, realSpeed DESC LIMIT 10" )) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class CongestionCityOrderList(Resource): def get(self): try: result = db.session.execute(text("SELECT * FROM jam LIMIT 10")) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class rankList(Resource): def get(self): try: result = db.session.execute(text("SELECT * FROM rank LIMIT 10")) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class rankListView(Resource): def get(self): try: result = db.session.execute(text( "SELECT * FROM rank ORDER BY idx DESC LIMIT 10" )) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class rankListViewPie(Resource): def get(self): try: result = db.session.execute(text( "SELECT healState as name, count(*) as value " "FROM rank GROUP BY healState ORDER BY name" )) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class headthView(Resource): def get(self): try: result = db.session.execute(text( "SELECT indicator as name, round(avg, 2) as value " "FROM countryindicators" )) columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class getTimeHeath(Resource): def get(self): # 使用 text() 包装 SQL 语句 sql = text("SELECT DISTINCT code FROM city_code") results = db.session.execute(sql) # 获取结果的方式取决于你的数据库驱动 # 方法1:如果使用 SQLAlchemy Core code_list = [row[0] for row in results] # 使用索引访问第一列 if not code_list: return jsonify({"error": "No data found"}), 404 random_element = random.choice(code_list) # 使用参数化查询防止 SQL 注入 query_sql = text(""" SELECT name, res, DATE_FORMAT(FROM_UNIXTIME(inert_time / 1000), '%Y-%d-%y') as time FROM city_week WHERE code = :code """) result = db.session.execute(query_sql, {'code': int(random_element)}) # 获取列名 columns = result.keys() # 构建结果字典 data = [dict(zip(columns, row)) for row in result] return jsonify(data) class getRoadHeath(Resource): def get(self): try: # 使用text()包装SQL并参数化查询 # 获取不重复的code列表 distinct_codes = db.session.execute( text("SELECT DISTINCT code FROM city_code") ) # 安全处理结果 - 方法1(适用于大多数驱动) code_list = [row[0] for row in distinct_codes] # 使用索引访问第一列 # 或者方法2(如果驱动支持): # code_list = [row.code for row in distinct_codes] if not code_list: return jsonify({"error": "No city codes found"}), 404 # 随机选择一个code random_code = random.choice(code_list) # 使用参数化查询防止SQL注入 road_data = db.session.execute( text("SELECT * FROM city_road WHERE code = :code"), {"code": int(random_code)} ) # 获取列名并构建结果字典 columns = road_data.keys() data = [dict(zip(columns, row)) for row in road_data] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 #城市拥堵指数占比分析 class cityListViewPie(Resource): def get(self): try: # 使用text()包装SQL查询 result = db.session.execute( text(""" SELECT name, avg(speed) as value FROM city_road GROUP BY name LIMIT 20 """) ) # 获取列名并构建结果字典 columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class dataCount(Resource): def get(self): try: # 使用text()包装SQL查询 result = db.session.execute( text(""" SELECT sum(a.Rows) as dataCount FROM ( SELECT TABLE_NAME AS `Table`, TABLE_ROWS AS `Rows` FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'bs_jaotong' ) a """) ) # 获取列名并构建结果字典 columns = result.keys() data = [dict(zip(columns, row)) for row in result] return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 class jiashiData(Resource): def get(self): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', 'Cookie': 'cna=jGJrHgCNgXIBASQOA6Fypcve; xlly_s=1; isg=BOnpxCMwqPzI-ZRIjwWwhSkd-JVDtt3oq4XyOovfz1BuUgxk0wQ-uRKLEPbkSnUg; tfstk=ekjvvAV7O7VmQWheroUl7K5BUCeuxiBqysWIjCAm5_CRMse2iCxcWfdR6E22jOOO6_APcIOMSh3O9sI0hh7q3a5hdmj0uhm90h-_tWq3x-W2bhMKMo6C3Z9LocV3xkXblK-GFW24T1Odmy0b0xancisMhyR6zF010YzBlB6XGtXcVoTmuTdRBOjXHTKfhaisfgpvkgzGxMIM2XApIqw8eVuwlLu9DgxbAl9rvLd3Hl3Z7UyyeB28wVuwlJJJt8fr7V8ly; user_unique_id=a1ac68b88d722bb9018e0cfc03e45d88; SESSION=074d5eba-96c3-4288-b3dc-3175a766ccdf' } req = requests.get(url="https://report.amap.com/sanjiyisu/roads.do?type=0", headers=headers) data = req.json() return jsonify(data)