310 lines
11 KiB
Python
310 lines
11 KiB
Python
# views.py 路由 + 视图函数
|
||
import os
|
||
|
||
import pymysql
|
||
from flask import request, url_for, jsonify
|
||
from flask import Blueprint
|
||
import hashlib
|
||
from math import ceil
|
||
|
||
from .utils.api_utils import APIUtils
|
||
from .models import *
|
||
blus = Blueprint("user", __name__)
|
||
db_config = {
|
||
'host': 'localhost',
|
||
'user': 'root',
|
||
'password': '123456',
|
||
'database': 'job',
|
||
'charset': 'utf8mb4'
|
||
}
|
||
# 注册
|
||
@blus.route('/api/register', methods=['POST'])
|
||
def user_register():
|
||
required_fields = ['username', 'password']
|
||
is_valid, message = APIUtils.validate_json(request.json, required_fields)
|
||
if not is_valid:
|
||
return APIUtils.error_response(message, status_code=400)
|
||
username = request.json['username']
|
||
password = request.json['password']
|
||
# 检查用户名是否已存在
|
||
existing_user = User.query.filter_by(username=username).first()
|
||
if existing_user:
|
||
return APIUtils.error_response("用户名已经存在!", status_code=400)
|
||
# 哈希处理密码
|
||
hashed_password = hashlib.sha256(password.encode()).hexdigest()
|
||
# 创建新用户
|
||
new_user = User(username=username, password=hashed_password,role=1)
|
||
db.session.add(new_user)
|
||
db.session.commit()
|
||
return APIUtils.success_response(message="登录成功!")
|
||
@blus.route('/api/login', methods=['POST'])
|
||
def user_login():
|
||
required_fields = ['username', 'password']
|
||
is_valid, message = APIUtils.validate_json(request.json, required_fields)
|
||
if not is_valid:
|
||
return APIUtils.error_response(message, status_code=400)
|
||
username = request.json['username']
|
||
password = request.json['password']
|
||
|
||
if username == "" or password == "":
|
||
return APIUtils.error_response("用户名或密码不能为空!", status_code=400)
|
||
user = User.query.filter_by(username=username).first()
|
||
if user is None:
|
||
return APIUtils.error_response("用户名错误或不存在!", status_code=401)
|
||
hashed_password = hashlib.sha256(password.encode()).hexdigest()
|
||
if hashed_password != user.password:
|
||
return APIUtils.error_response("密码错误或不存在!", status_code=401)
|
||
return APIUtils.success_response(data={'token': user.id, 'username': user.username,'role':user.role}, message="登录成功!")
|
||
|
||
@blus.route('/change_password', methods=['POST'])
|
||
def change_password():
|
||
required_fields = ['username', 'old_password', 'new_password']
|
||
is_valid, message = APIUtils.validate_json(request.json, required_fields)
|
||
|
||
if not is_valid:
|
||
return APIUtils.error_response(message, status_code=400)
|
||
|
||
username = request.json['username']
|
||
old_password = request.json['old_password']
|
||
new_password = request.json['new_password']
|
||
|
||
user = User.query.filter_by(username=username).first()
|
||
|
||
if user is None:
|
||
return APIUtils.error_response("用户不存在!", status_code=404)
|
||
hashed_old_password = hashlib.sha256(old_password.encode()).hexdigest()
|
||
|
||
if hashed_old_password != user.password:
|
||
return APIUtils.error_response("旧密码错误!", status_code=401)
|
||
|
||
# 哈希处理新密码
|
||
hashed_new_password = hashlib.sha256(new_password.encode()).hexdigest()
|
||
user.password = hashed_new_password
|
||
|
||
db.session.commit()
|
||
return APIUtils.success_response(message="密码修改成功!")
|
||
|
||
|
||
@blus.route('/api/user/del/<int:user_id>', methods=['DELETE'])
|
||
def delete_user(user_id):
|
||
# 根据用户 ID 查询用户
|
||
user = User.query.get(user_id)
|
||
|
||
if user is None:
|
||
return APIUtils.error_response("用户不存在!", status_code=404)
|
||
|
||
# 检查是否为 admin 用户
|
||
if user.username.lower() == 'admin':
|
||
return APIUtils.error_response("无法删除管理员账户!", status_code=403)
|
||
|
||
# 删除用户
|
||
db.session.delete(user)
|
||
db.session.commit()
|
||
|
||
return APIUtils.success_response(message="用户删除成功!")
|
||
|
||
# 用户管理
|
||
@blus.route('/api/users/page', methods=['GET'])
|
||
def get_users():
|
||
# 获取分页参数,默认为第 1 页,每页 10 条记录
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = request.args.get('per_page', 10, type=int)
|
||
|
||
# 获取 username 参数,如果没有则为 None
|
||
username = request.args.get('username', type=str)
|
||
|
||
# 构建查询,先查询所有用户
|
||
query = User.query
|
||
|
||
# 如果提供了 username,则根据 username 进行筛选
|
||
if username:
|
||
query = query.filter(User.username.like(f'%{username}%'))
|
||
|
||
# 执行分页查询
|
||
users_pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
# 获取用户数据
|
||
users = users_pagination.items
|
||
|
||
# 将用户数据转为 JSON 格式
|
||
users_list = []
|
||
for user in users:
|
||
users_list.append({
|
||
'id': user.id,
|
||
'username': user.username,
|
||
'password': user.password,
|
||
'role': user.role,
|
||
# 其他需要返回的字段
|
||
})
|
||
|
||
# 构建响应数据,包括分页信息
|
||
response = {
|
||
'list': users_list,
|
||
'page': {
|
||
'total': users_pagination.total, # 总记录数
|
||
'page': users_pagination.page, # 当前页码
|
||
'limit': users_pagination.per_page # 每页记录数
|
||
}
|
||
}
|
||
|
||
return APIUtils.success_response(data=response, message="获取用户列表成功")
|
||
|
||
|
||
|
||
# 文件上传
|
||
@blus.route('/api/upload', methods=['POST'])
|
||
def upload():
|
||
# 检查是否有文件上传
|
||
if 'file' not in request.files:
|
||
return APIUtils.error_response(message="没有上传文件!")
|
||
file = request.files['file']
|
||
# 如果用户没有选择文件,浏览器也会提交一个空文件
|
||
if file.filename == '':
|
||
return APIUtils.error_response(message="没有上传文件!")
|
||
# 保存文件
|
||
upload_folder = "uploads"
|
||
if not os.path.exists(upload_folder):
|
||
os.makedirs(upload_folder) # 如果不存在则创建目录
|
||
# 保存文件
|
||
file_path = os.path.join(upload_folder, file.filename)
|
||
file.save(file_path)
|
||
|
||
# 构建文件的可访问 URL
|
||
file_url = f"http://127.0.0.1:5000/{upload_folder}/{file.filename}"
|
||
|
||
# 返回上传路径和文件名
|
||
response_data = {
|
||
"name": file.filename.split(".")[0],
|
||
"path": file_path, # 保存的完整路径
|
||
"url": file_url # 可访问的 URL
|
||
}
|
||
return APIUtils.success_response(data=response_data, message="上传成功")
|
||
|
||
|
||
# 增:添加新的交易记录
|
||
@blus.route('/api/transactions', methods=['POST'])
|
||
def add_transaction():
|
||
data = request.get_json()
|
||
|
||
new_transaction = FinancialTransaction(
|
||
user_id=data['user_id'],
|
||
transaction_amount=data['transaction_amount'],
|
||
transaction_time=data['transaction_time'],
|
||
transaction_location=data.get('transaction_location', ''),
|
||
device_info=data.get('device_info', ''),
|
||
ip_address=data.get('ip_address', ''),
|
||
browser_info=data.get('browser_info', ''),
|
||
is_fraud=data['is_fraud']
|
||
)
|
||
|
||
db.session.add(new_transaction)
|
||
db.session.commit()
|
||
|
||
return APIUtils.success_response(data=jsonify(new_transaction.to_dict()), message="成功")
|
||
|
||
# 查:获取所有交易记录
|
||
@blus.route('/api/transactions', methods=['GET'])
|
||
def get_transactions():
|
||
# 获取分页参数,设置默认值
|
||
page = request.args.get('page', 1, type=int) # 默认第一页
|
||
page_size = request.args.get('page_size', 10, type=int) # 默认每页10条
|
||
|
||
# 计算分页偏移量
|
||
offset = (page - 1) * page_size
|
||
|
||
# 查询交易记录,使用 limit 和 offset 实现分页
|
||
transactions = FinancialTransaction.query.offset(offset).limit(page_size).all()
|
||
|
||
# 获取总记录数,用于计算总页数
|
||
total_count = FinancialTransaction.query.count()
|
||
total_pages = ceil(total_count / page_size)
|
||
|
||
# 构建响应数据,包括分页信息
|
||
response = {
|
||
'data': [transaction.to_dict() for transaction in transactions],
|
||
'page': {
|
||
"current_page": page,
|
||
"page_size": page_size,
|
||
"total_count": total_count,
|
||
"total_pages": total_pages
|
||
}
|
||
}
|
||
|
||
# 返回分页数据,包括当前页的记录和总信息
|
||
return APIUtils.success_response(
|
||
data=response,
|
||
message="成功",
|
||
|
||
)
|
||
|
||
|
||
# 查:获取单个交易记录
|
||
@blus.route('/api/transactions/<int:transaction_id>', methods=['GET'])
|
||
def get_transaction(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
if transaction is None:
|
||
return jsonify({'message': 'Transaction not found'}), 404
|
||
|
||
return jsonify(transaction.to_dict())
|
||
|
||
|
||
# 改:更新交易记录
|
||
@blus.route('/api/transactions/<int:transaction_id>', methods=['PUT'])
|
||
def update_transaction(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
|
||
if transaction is None:
|
||
return jsonify({'message': 'Transaction not found'}), 404
|
||
|
||
data = request.get_json()
|
||
|
||
transaction.user_id = data.get('user_id', transaction.user_id)
|
||
transaction.transaction_amount = data.get('transaction_amount', transaction.transaction_amount)
|
||
transaction.transaction_time = data.get('transaction_time', transaction.transaction_time)
|
||
transaction.transaction_location = data.get('transaction_location', transaction.transaction_location)
|
||
transaction.device_info = data.get('device_info', transaction.device_info)
|
||
transaction.ip_address = data.get('ip_address', transaction.ip_address)
|
||
transaction.browser_info = data.get('browser_info', transaction.browser_info)
|
||
transaction.is_fraud = data.get('is_fraud', transaction.is_fraud)
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify(transaction.to_dict())
|
||
|
||
|
||
# 删:删除交易记录
|
||
@blus.route('/api/transactions/<int:transaction_id>', methods=['DELETE'])
|
||
def delete_transaction(transaction_id):
|
||
transaction = FinancialTransaction.query.get(transaction_id)
|
||
|
||
if transaction is None:
|
||
return jsonify({'message': 'Transaction not found'}), 404
|
||
|
||
db.session.delete(transaction)
|
||
db.session.commit()
|
||
return jsonify({'message': 'Transaction deleted'}), 200
|
||
|
||
|
||
# SQL查询
|
||
@blus.route('/api/mysql', methods=['POST'])
|
||
def mysql():
|
||
data = request.get_json()
|
||
|
||
# 检查 SQL 参数是否存在
|
||
if not data['sql']:
|
||
return APIUtils.error_response(message="没有sql参数")
|
||
try:
|
||
# 连接数据库
|
||
connection = pymysql.connect(**db_config)
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 自定义 SQL 查询
|
||
cursor.execute(data['sql'])
|
||
# 获取查询结果
|
||
results = cursor.fetchall()
|
||
return results
|
||
except pymysql.MySQLError as e:
|
||
return APIUtils.error_response(message=f"数据库连接失败:{str(e)}")
|
||
except Exception as e:
|
||
return APIUtils.error_response(message=f"查询执行失败:{str(e)}")
|
||
|