fraud-detection-ml/App/views.py
2025-02-26 22:25:44 +08:00

290 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# views.py 路由 + 视图函数
import os
import pymysql
from flask import request, url_for, jsonify
from flask import Blueprint
import hashlib
from math import ceil
from sqlalchemy import and_
from .utils.api_utils import APIUtils
from .models import *
blus = Blueprint("user", __name__)
db_config = {
'host': '192.168.15.2',
'user': 'root',
'password': 'minxianrui',
'database': 'fraud_detection_ml',
'charset': 'utf8mb4'
}
# 注册
@blus.route('/api/register', methods=['POST'])
def user_register():
required_fields = ['username', 'password']
is_valid, message = APIUtils.validate_json(request.json, required_fields)
if not is_valid:
return APIUtils.error_response(message, status_code=400)
username = request.json['username']
password = request.json['password']
# 检查用户名是否已存在
existing_user = User.query.filter_by(username=username).first()
if existing_user:
return APIUtils.error_response("用户名已经存在!", status_code=400)
# 哈希处理密码
hashed_password = hashlib.sha256(password.encode()).hexdigest()
# 创建新用户
new_user = User(username=username, password=hashed_password,role=1)
db.session.add(new_user)
db.session.commit()
return APIUtils.success_response(message="登录成功!")
@blus.route('/api/login', methods=['POST'])
def user_login():
required_fields = ['username', 'password']
is_valid, message = APIUtils.validate_json(request.json, required_fields)
if not is_valid:
return APIUtils.error_response(message, status_code=400)
username = request.json['username']
password = request.json['password']
if username == "" or password == "":
return APIUtils.error_response("用户名或密码不能为空!", status_code=400)
user = User.query.filter_by(username=username).first()
if user is None:
return APIUtils.error_response("用户名错误或不存在!", status_code=401)
hashed_password = hashlib.sha256(password.encode()).hexdigest()
if hashed_password != user.password:
return APIUtils.error_response("密码错误或不存在!", status_code=401)
return APIUtils.success_response(data={'token': user.id, 'username': user.username,'role':user.role}, message="登录成功!")
@blus.route('/change_password', methods=['POST'])
def change_password():
required_fields = ['username', 'old_password', 'new_password']
is_valid, message = APIUtils.validate_json(request.json, required_fields)
if not is_valid:
return APIUtils.error_response(message, status_code=400)
username = request.json['username']
old_password = request.json['old_password']
new_password = request.json['new_password']
user = User.query.filter_by(username=username).first()
if user is None:
return APIUtils.error_response("用户不存在!", status_code=404)
hashed_old_password = hashlib.sha256(old_password.encode()).hexdigest()
if hashed_old_password != user.password:
return APIUtils.error_response("旧密码错误!", status_code=401)
# 哈希处理新密码
hashed_new_password = hashlib.sha256(new_password.encode()).hexdigest()
user.password = hashed_new_password
db.session.commit()
return APIUtils.success_response(message="密码修改成功!")
@blus.route('/api/user/del/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
# 根据用户 ID 查询用户
user = User.query.get(user_id)
if user is None:
return APIUtils.error_response("用户不存在!", status_code=404)
# 检查是否为 admin 用户
if user.username.lower() == 'admin':
return APIUtils.error_response("无法删除管理员账户!", status_code=403)
# 删除用户
db.session.delete(user)
db.session.commit()
return APIUtils.success_response(message="用户删除成功!")
# 用户管理
@blus.route('/api/users/page', methods=['GET'])
def get_users():
# 获取分页参数,默认为第 1 页,每页 10 条记录
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 获取 username 参数,如果没有则为 None
username = request.args.get('username', type=str)
# 构建查询,先查询所有用户
query = User.query
# 如果提供了 username则根据 username 进行筛选
if username:
query = query.filter(User.username.like(f'%{username}%'))
# 执行分页查询
users_pagination = query.paginate(page=page, per_page=per_page, error_out=False)
# 获取用户数据
users = users_pagination.items
# 将用户数据转为 JSON 格式
users_list = []
for user in users:
users_list.append({
'id': user.id,
'username': user.username,
'password': user.password,
'role': user.role,
# 其他需要返回的字段
})
# 构建响应数据,包括分页信息
response = {
'list': users_list,
'page': {
'total': users_pagination.total, # 总记录数
'page': users_pagination.page, # 当前页码
'limit': users_pagination.per_page # 每页记录数
}
}
return APIUtils.success_response(data=response, message="获取用户列表成功")
# 增:添加新的交易记录
@blus.route('/api/transactions', methods=['POST'])
def add_transaction():
data = request.get_json()
new_transaction = FinancialTransaction(
user_id=data['user_id'],
transaction_amount=data['transaction_amount'],
transaction_time=data['transaction_time'],
transaction_location=data.get('transaction_location', ''),
device_info=data.get('device_info', ''),
ip_address=data.get('ip_address', ''),
browser_info=data.get('browser_info', ''),
is_fraud=data['is_fraud']
)
db.session.add(new_transaction)
db.session.commit()
return APIUtils.success_response(data=jsonify(new_transaction.to_dict()), message="成功")
# 查:获取所有交易记录
@blus.route('/api/transactions', methods=['GET'])
def get_transactions():
# 获取分页参数,设置默认值
page = request.args.get('page', 1, type=int) # 默认第一页
page_size = request.args.get('page_size', 10, type=int) # 默认每页10条
query = FinancialTransaction.query
transactionStatus = request.args.get('transactionStatus')
status = request.args.get('status')
if transactionStatus and status:
query = query.filter(and_(
FinancialTransaction.is_fraud.like(f'%{transactionStatus}%'),
FinancialTransaction.status.like(f'%{status}%')
))
elif transactionStatus:
query = query.filter(FinancialTransaction.is_fraud.like(f'%{transactionStatus}%'))
elif status:
query = query.filter(FinancialTransaction.status.like(f'%{status}%'))
# 计算分页偏移量
offset = (page - 1) * page_size
# 查询交易记录,使用 limit 和 offset 实现分页
transactions = query.offset(offset).limit(page_size).all()
# 获取总记录数,用于计算总页数
total_count = query.count()
total_pages = ceil(total_count / page_size)
# 构建响应数据,包括分页信息
response = {
'data': [transaction.to_dict() for transaction in transactions],
'page': {
"current_page": page,
"page_size": page_size,
"total_count": total_count,
"total_pages": total_pages
}
}
# 返回分页数据,包括当前页的记录和总信息
return APIUtils.success_response(
data=response,
message="成功",
)
# 查:获取单个交易记录
@blus.route('/api/transactions/<int:transaction_id>', methods=['GET'])
def get_transaction(transaction_id):
transaction = FinancialTransaction.query.get(transaction_id)
if transaction is None:
return jsonify({'message': 'Transaction not found'}), 404
return jsonify(transaction.to_dict())
# 改:更新交易记录
@blus.route('/api/transactions/<int:transaction_id>', methods=['PUT'])
def update_transaction(transaction_id):
transaction = FinancialTransaction.query.get(transaction_id)
if transaction is None:
return jsonify({'message': 'Transaction not found'}), 404
data = request.get_json()
transaction.user_id = data.get('user_id', transaction.user_id)
transaction.transaction_amount = data.get('transaction_amount', transaction.transaction_amount)
transaction.transaction_time = data.get('transaction_time', transaction.transaction_time)
transaction.transaction_location = data.get('transaction_location', transaction.transaction_location)
transaction.device_info = data.get('device_info', transaction.device_info)
transaction.ip_address = data.get('ip_address', transaction.ip_address)
transaction.browser_info = data.get('browser_info', transaction.browser_info)
transaction.is_fraud = data.get('is_fraud', transaction.is_fraud)
db.session.commit()
return jsonify(transaction.to_dict())
# 改:更新交易记录
@blus.route('/api/utransactions/<int:transaction_id>', methods=['PUT'])
def update_transaction1(transaction_id):
transaction = FinancialTransaction.query.get(transaction_id)
print(transaction_id)
transaction.status = 1
db.session.commit()
return jsonify(transaction.to_dict())
# 删:删除交易记录
@blus.route('/api/transactions/<int:transaction_id>', methods=['DELETE'])
def delete_transaction(transaction_id):
transaction = FinancialTransaction.query.get(transaction_id)
if transaction is None:
return jsonify({'message': 'Transaction not found'}), 404
db.session.delete(transaction)
db.session.commit()
return jsonify({'message': 'Transaction deleted'}), 200
# SQL查询
@blus.route('/api/mysql', methods=['POST'])
def mysql():
data = request.get_json()
# 检查 SQL 参数是否存在
if not data['sql']:
return APIUtils.error_response(message="没有sql参数")
try:
# 连接数据库
connection = pymysql.connect(**db_config)
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 自定义 SQL 查询
cursor.execute(data['sql'])
# 获取查询结果
results = cursor.fetchall()
return results
except pymysql.MySQLError as e:
return APIUtils.error_response(message=f"数据库连接失败:{str(e)}")
except Exception as e:
return APIUtils.error_response(message=f"查询执行失败:{str(e)}")