# views.py 路由 + 视图函数 import os import pymysql from flask import request, url_for, jsonify from flask import Blueprint import hashlib from math import ceil from sqlalchemy import and_ from .utils.api_utils import APIUtils from .models import * blus = Blueprint("user", __name__) db_config = { 'host': '192.168.15.2', 'user': 'root', 'password': 'minxianrui', 'database': 'fraud_detection_ml', 'charset': 'utf8mb4' } # 注册 @blus.route('/api/register', methods=['POST']) def user_register(): required_fields = ['username', 'password'] is_valid, message = APIUtils.validate_json(request.json, required_fields) if not is_valid: return APIUtils.error_response(message, status_code=400) username = request.json['username'] password = request.json['password'] # 检查用户名是否已存在 existing_user = User.query.filter_by(username=username).first() if existing_user: return APIUtils.error_response("用户名已经存在!", status_code=400) # 哈希处理密码 hashed_password = hashlib.sha256(password.encode()).hexdigest() # 创建新用户 new_user = User(username=username, password=hashed_password,role=1) db.session.add(new_user) db.session.commit() return APIUtils.success_response(message="登录成功!") @blus.route('/api/login', methods=['POST']) def user_login(): required_fields = ['username', 'password'] is_valid, message = APIUtils.validate_json(request.json, required_fields) if not is_valid: return APIUtils.error_response(message, status_code=400) username = request.json['username'] password = request.json['password'] if username == "" or password == "": return APIUtils.error_response("用户名或密码不能为空!", status_code=400) user = User.query.filter_by(username=username).first() if user is None: return APIUtils.error_response("用户名错误或不存在!", status_code=401) hashed_password = hashlib.sha256(password.encode()).hexdigest() if hashed_password != user.password: return APIUtils.error_response("密码错误或不存在!", status_code=401) return APIUtils.success_response(data={'token': user.id, 'username': user.username,'role':user.role}, message="登录成功!") @blus.route('/change_password', methods=['POST']) def change_password(): required_fields = ['username', 'old_password', 'new_password'] is_valid, message = APIUtils.validate_json(request.json, required_fields) if not is_valid: return APIUtils.error_response(message, status_code=400) username = request.json['username'] old_password = request.json['old_password'] new_password = request.json['new_password'] user = User.query.filter_by(username=username).first() if user is None: return APIUtils.error_response("用户不存在!", status_code=404) hashed_old_password = hashlib.sha256(old_password.encode()).hexdigest() if hashed_old_password != user.password: return APIUtils.error_response("旧密码错误!", status_code=401) # 哈希处理新密码 hashed_new_password = hashlib.sha256(new_password.encode()).hexdigest() user.password = hashed_new_password db.session.commit() return APIUtils.success_response(message="密码修改成功!") @blus.route('/api/user/del/', methods=['DELETE']) def delete_user(user_id): # 根据用户 ID 查询用户 user = User.query.get(user_id) if user is None: return APIUtils.error_response("用户不存在!", status_code=404) # 检查是否为 admin 用户 if user.username.lower() == 'admin': return APIUtils.error_response("无法删除管理员账户!", status_code=403) # 删除用户 db.session.delete(user) db.session.commit() return APIUtils.success_response(message="用户删除成功!") # 用户管理 @blus.route('/api/users/page', methods=['GET']) def get_users(): # 获取分页参数,默认为第 1 页,每页 10 条记录 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 获取 username 参数,如果没有则为 None username = request.args.get('username', type=str) # 构建查询,先查询所有用户 query = User.query # 如果提供了 username,则根据 username 进行筛选 if username: query = query.filter(User.username.like(f'%{username}%')) # 执行分页查询 users_pagination = query.paginate(page=page, per_page=per_page, error_out=False) # 获取用户数据 users = users_pagination.items # 将用户数据转为 JSON 格式 users_list = [] for user in users: users_list.append({ 'id': user.id, 'username': user.username, 'password': user.password, 'role': user.role, # 其他需要返回的字段 }) # 构建响应数据,包括分页信息 response = { 'list': users_list, 'page': { 'total': users_pagination.total, # 总记录数 'page': users_pagination.page, # 当前页码 'limit': users_pagination.per_page # 每页记录数 } } return APIUtils.success_response(data=response, message="获取用户列表成功") # 增:添加新的交易记录 @blus.route('/api/transactions', methods=['POST']) def add_transaction(): data = request.get_json() new_transaction = FinancialTransaction( user_id=data['user_id'], transaction_amount=data['transaction_amount'], transaction_time=data['transaction_time'], transaction_location=data.get('transaction_location', ''), device_info=data.get('device_info', ''), ip_address=data.get('ip_address', ''), browser_info=data.get('browser_info', ''), is_fraud=data['is_fraud'] ) db.session.add(new_transaction) db.session.commit() return APIUtils.success_response(data=jsonify(new_transaction.to_dict()), message="成功") # 查:获取所有交易记录 @blus.route('/api/transactions', methods=['GET']) def get_transactions(): # 获取分页参数,设置默认值 page = request.args.get('page', 1, type=int) # 默认第一页 page_size = request.args.get('page_size', 10, type=int) # 默认每页10条 query = FinancialTransaction.query transactionStatus = request.args.get('transactionStatus') status = request.args.get('status') if transactionStatus and status: query = query.filter(and_( FinancialTransaction.is_fraud.like(f'%{transactionStatus}%'), FinancialTransaction.status.like(f'%{status}%') )) elif transactionStatus: query = query.filter(FinancialTransaction.is_fraud.like(f'%{transactionStatus}%')) elif status: query = query.filter(FinancialTransaction.status.like(f'%{status}%')) # 计算分页偏移量 offset = (page - 1) * page_size # 查询交易记录,使用 limit 和 offset 实现分页 transactions = query.offset(offset).limit(page_size).all() # 获取总记录数,用于计算总页数 total_count = query.count() total_pages = ceil(total_count / page_size) # 构建响应数据,包括分页信息 response = { 'data': [transaction.to_dict() for transaction in transactions], 'page': { "current_page": page, "page_size": page_size, "total_count": total_count, "total_pages": total_pages } } # 返回分页数据,包括当前页的记录和总信息 return APIUtils.success_response( data=response, message="成功", ) # 查:获取单个交易记录 @blus.route('/api/transactions/', methods=['GET']) def get_transaction(transaction_id): transaction = FinancialTransaction.query.get(transaction_id) if transaction is None: return jsonify({'message': 'Transaction not found'}), 404 return jsonify(transaction.to_dict()) # 改:更新交易记录 @blus.route('/api/transactions/', methods=['PUT']) def update_transaction(transaction_id): transaction = FinancialTransaction.query.get(transaction_id) if transaction is None: return jsonify({'message': 'Transaction not found'}), 404 data = request.get_json() transaction.user_id = data.get('user_id', transaction.user_id) transaction.transaction_amount = data.get('transaction_amount', transaction.transaction_amount) transaction.transaction_time = data.get('transaction_time', transaction.transaction_time) transaction.transaction_location = data.get('transaction_location', transaction.transaction_location) transaction.device_info = data.get('device_info', transaction.device_info) transaction.ip_address = data.get('ip_address', transaction.ip_address) transaction.browser_info = data.get('browser_info', transaction.browser_info) transaction.is_fraud = data.get('is_fraud', transaction.is_fraud) db.session.commit() return jsonify(transaction.to_dict()) # 改:更新交易记录 @blus.route('/api/utransactions/', methods=['PUT']) def update_transaction1(transaction_id): transaction = FinancialTransaction.query.get(transaction_id) print(transaction_id) transaction.status = 1 db.session.commit() return jsonify(transaction.to_dict()) # 删:删除交易记录 @blus.route('/api/transactions/', methods=['DELETE']) def delete_transaction(transaction_id): transaction = FinancialTransaction.query.get(transaction_id) if transaction is None: return jsonify({'message': 'Transaction not found'}), 404 db.session.delete(transaction) db.session.commit() return jsonify({'message': 'Transaction deleted'}), 200 # SQL查询 @blus.route('/api/mysql', methods=['POST']) def mysql(): data = request.get_json() # 检查 SQL 参数是否存在 if not data['sql']: return APIUtils.error_response(message="没有sql参数") try: # 连接数据库 connection = pymysql.connect(**db_config) with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 自定义 SQL 查询 cursor.execute(data['sql']) # 获取查询结果 results = cursor.fetchall() return results except pymysql.MySQLError as e: return APIUtils.error_response(message=f"数据库连接失败:{str(e)}") except Exception as e: return APIUtils.error_response(message=f"查询执行失败:{str(e)}")