290 lines
10 KiB
Python
290 lines
10 KiB
Python
# views.py 路由 + 视图函数
|
||
import os
|
||
|
||
import pymysql
|
||
from flask import request, url_for, jsonify
|
||
from flask import Blueprint
|
||
import hashlib
|
||
from math import ceil
|
||
|
||
from sqlalchemy import and_
|
||
|
||
from .utils.api_utils import APIUtils
|
||
from .models import *
|
||
blus = Blueprint("user", __name__)
|
||
db_config = {
|
||
'host': '192.168.15.2',
|
||
'user': 'root',
|
||
'password': 'minxianrui',
|
||
'database': 'fraud_detection_ml',
|
||
'charset': 'utf8mb4'
|
||
}
|
||
# 注册
|
||
@blus.route('/api/register', methods=['POST'])
|
||
def user_register():
|
||
required_fields = ['username', 'password']
|
||
is_valid, message = APIUtils.validate_json(request.json, required_fields)
|
||
if not is_valid:
|
||
return APIUtils.error_response(message, status_code=400)
|
||
username = request.json['username']
|
||
password = request.json['password']
|
||
# 检查用户名是否已存在
|
||
existing_user = User.query.filter_by(username=username).first()
|
||
if existing_user:
|
||
return APIUtils.error_response("用户名已经存在!", status_code=400)
|
||
# 哈希处理密码
|
||
hashed_password = hashlib.sha256(password.encode()).hexdigest()
|
||
# 创建新用户
|
||
new_user = User(username=username, password=hashed_password,role=1)
|
||
db.session.add(new_user)
|
||
db.session.commit()
|
||
return APIUtils.success_response(message="登录成功!")
|
||
@blus.route('/api/login', methods=['POST'])
|
||
def user_login():
|
||
required_fields = ['username', 'password']
|
||
is_valid, message = APIUtils.validate_json(request.json, required_fields)
|
||
if not is_valid:
|
||
return APIUtils.error_response(message, status_code=400)
|
||
username = request.json['username']
|
||
password = request.json['password']
|
||
|
||
if username == "" or password == "":
|
||
return APIUtils.error_response("用户名或密码不能为空!", status_code=400)
|
||
user = User.query.filter_by(username=username).first()
|
||
if user is None:
|
||
return APIUtils.error_response("用户名错误或不存在!", status_code=401)
|
||
hashed_password = hashlib.sha256(password.encode()).hexdigest()
|
||
if hashed_password != user.password:
|
||
return APIUtils.error_response("密码错误或不存在!", status_code=401)
|
||
return APIUtils.success_response(data={'token': user.id, 'username': user.username,'role':user.role}, message="登录成功!")
|
||
|
||
@blus.route('/change_password', methods=['POST'])
|
||
def change_password():
|
||
required_fields = ['username', 'old_password', 'new_password']
|
||
is_valid, message = APIUtils.validate_json(request.json, required_fields)
|
||
|
||
if not is_valid:
|
||
return APIUtils.error_response(message, status_code=400)
|
||
|
||
username = request.json['username']
|
||
old_password = request.json['old_password']
|
||
new_password = request.json['new_password']
|
||
|
||
user = User.query.filter_by(username=username).first()
|
||
|
||
if user is None:
|
||
return APIUtils.error_response("用户不存在!", status_code=404)
|
||
hashed_old_password = hashlib.sha256(old_password.encode()).hexdigest()
|
||
|
||
if hashed_old_password != user.password:
|
||
return APIUtils.error_response("旧密码错误!", status_code=401)
|
||
|
||
# 哈希处理新密码
|
||
hashed_new_password = hashlib.sha256(new_password.encode()).hexdigest()
|
||
user.password = hashed_new_password
|
||
|
||
db.session.commit()
|
||
return APIUtils.success_response(message="密码修改成功!")
|
||
|
||
|
||
@blus.route('/api/user/del/<int:user_id>', methods=['DELETE'])
|
||
def delete_user(user_id):
|
||
# 根据用户 ID 查询用户
|
||
user = User.query.get(user_id)
|
||
|
||
if user is None:
|
||
return APIUtils.error_response("用户不存在!", status_code=404)
|
||
|
||
# 检查是否为 admin 用户
|
||
if user.username.lower() == 'admin':
|
||
return APIUtils.error_response("无法删除管理员账户!", status_code=403)
|
||
|
||
# 删除用户
|
||
db.session.delete(user)
|
||
db.session.commit()
|
||
|
||
return APIUtils.success_response(message="用户删除成功!")
|
||
|
||
# 用户管理
|
||
@blus.route('/api/users/page', methods=['GET'])
|
||
def get_users():
|
||
# 获取分页参数,默认为第 1 页,每页 10 条记录
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = request.args.get('per_page', 10, type=int)
|
||
|
||
# 获取 username 参数,如果没有则为 None
|
||
username = request.args.get('username', type=str)
|
||
# 构建查询,先查询所有用户
|
||
query = User.query
|
||
# 如果提供了 username,则根据 username 进行筛选
|
||
if username:
|
||
query = query.filter(User.username.like(f'%{username}%'))
|
||
# 执行分页查询
|
||
users_pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
# 获取用户数据
|
||
users = users_pagination.items
|
||
|
||
# 将用户数据转为 JSON 格式
|
||
users_list = []
|
||
for user in users:
|
||
users_list.append({
|
||
'id': user.id,
|
||
'username': user.username,
|
||
'password': user.password,
|
||
'role': user.role,
|
||
# 其他需要返回的字段
|
||
})
|
||
|
||
# 构建响应数据,包括分页信息
|
||
response = {
|
||
'list': users_list,
|
||
'page': {
|
||
'total': users_pagination.total, # 总记录数
|
||
'page': users_pagination.page, # 当前页码
|
||
'limit': users_pagination.per_page # 每页记录数
|
||
}
|
||
}
|
||
|
||
return APIUtils.success_response(data=response, message="获取用户列表成功")
|
||
|
||
|
||
# 增:添加新的交易记录
|
||
@blus.route('/api/transactions', methods=['POST'])
|
||
def add_transaction():
|
||
data = request.get_json()
|
||
new_transaction = FinancialTransaction(
|
||
user_id=data['user_id'],
|
||
transaction_amount=data['transaction_amount'],
|
||
transaction_time=data['transaction_time'],
|
||
transaction_location=data.get('transaction_location', ''),
|
||
device_info=data.get('device_info', ''),
|
||
ip_address=data.get('ip_address', ''),
|
||
browser_info=data.get('browser_info', ''),
|
||
is_fraud=data['is_fraud']
|
||
)
|
||
db.session.add(new_transaction)
|
||
db.session.commit()
|
||
|
||
return APIUtils.success_response(data=jsonify(new_transaction.to_dict()), message="成功")
|
||
|
||
# 查:获取所有交易记录
|
||
@blus.route('/api/transactions', methods=['GET'])
|
||
def get_transactions():
|
||
# 获取分页参数,设置默认值
|
||
page = request.args.get('page', 1, type=int) # 默认第一页
|
||
page_size = request.args.get('page_size', 10, type=int) # 默认每页10条
|
||
|
||
query = FinancialTransaction.query
|
||
transactionStatus = request.args.get('transactionStatus')
|
||
status = request.args.get('status')
|
||
|
||
|
||
if transactionStatus and status:
|
||
query = query.filter(and_(
|
||
FinancialTransaction.is_fraud.like(f'%{transactionStatus}%'),
|
||
FinancialTransaction.status.like(f'%{status}%')
|
||
))
|
||
elif transactionStatus:
|
||
query = query.filter(FinancialTransaction.is_fraud.like(f'%{transactionStatus}%'))
|
||
elif status:
|
||
query = query.filter(FinancialTransaction.status.like(f'%{status}%'))
|
||
# 计算分页偏移量
|
||
offset = (page - 1) * page_size
|
||
# 查询交易记录,使用 limit 和 offset 实现分页
|
||
transactions = query.offset(offset).limit(page_size).all()
|
||
# 获取总记录数,用于计算总页数
|
||
total_count = query.count()
|
||
total_pages = ceil(total_count / page_size)
|
||
# 构建响应数据,包括分页信息
|
||
response = {
|
||
'data': [transaction.to_dict() for transaction in transactions],
|
||
'page': {
|
||
"current_page": page,
|
||
"page_size": page_size,
|
||
"total_count": total_count,
|
||
"total_pages": total_pages
|
||
}
|
||
}
|
||
# 返回分页数据,包括当前页的记录和总信息
|
||
return APIUtils.success_response(
|
||
data=response,
|
||
message="成功",
|
||
)
|
||
# 查:获取单个交易记录
|
||
@blus.route('/api/transactions/<int:transaction_id>', methods=['GET'])
|
||
def get_transaction(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
if transaction is None:
|
||
return jsonify({'message': 'Transaction not found'}), 404
|
||
|
||
return jsonify(transaction.to_dict())
|
||
|
||
|
||
# 改:更新交易记录
|
||
@blus.route('/api/transactions/<int:transaction_id>', methods=['PUT'])
|
||
def update_transaction(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
if transaction is None:
|
||
return jsonify({'message': 'Transaction not found'}), 404
|
||
data = request.get_json()
|
||
transaction.user_id = data.get('user_id', transaction.user_id)
|
||
transaction.transaction_amount = data.get('transaction_amount', transaction.transaction_amount)
|
||
transaction.transaction_time = data.get('transaction_time', transaction.transaction_time)
|
||
transaction.transaction_location = data.get('transaction_location', transaction.transaction_location)
|
||
transaction.device_info = data.get('device_info', transaction.device_info)
|
||
transaction.ip_address = data.get('ip_address', transaction.ip_address)
|
||
transaction.browser_info = data.get('browser_info', transaction.browser_info)
|
||
transaction.is_fraud = data.get('is_fraud', transaction.is_fraud)
|
||
db.session.commit()
|
||
return jsonify(transaction.to_dict())
|
||
|
||
|
||
|
||
# 改:更新交易记录
|
||
@blus.route('/api/utransactions/<int:transaction_id>', methods=['PUT'])
|
||
def update_transaction1(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
|
||
print(transaction_id)
|
||
transaction.status = 1
|
||
db.session.commit()
|
||
return jsonify(transaction.to_dict())
|
||
|
||
|
||
|
||
# 删:删除交易记录
|
||
@blus.route('/api/transactions/<int:transaction_id>', methods=['DELETE'])
|
||
def delete_transaction(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
|
||
if transaction is None:
|
||
return jsonify({'message': 'Transaction not found'}), 404
|
||
|
||
db.session.delete(transaction)
|
||
db.session.commit()
|
||
return jsonify({'message': 'Transaction deleted'}), 200
|
||
|
||
|
||
# SQL查询
|
||
@blus.route('/api/mysql', methods=['POST'])
|
||
def mysql():
|
||
data = request.get_json()
|
||
# 检查 SQL 参数是否存在
|
||
if not data['sql']:
|
||
return APIUtils.error_response(message="没有sql参数")
|
||
try:
|
||
# 连接数据库
|
||
connection = pymysql.connect(**db_config)
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 自定义 SQL 查询
|
||
cursor.execute(data['sql'])
|
||
# 获取查询结果
|
||
results = cursor.fetchall()
|
||
return results
|
||
except pymysql.MySQLError as e:
|
||
return APIUtils.error_response(message=f"数据库连接失败:{str(e)}")
|
||
except Exception as e:
|
||
return APIUtils.error_response(message=f"查询执行失败:{str(e)}")
|
||
|