| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- #user.py
-
- from app.libs.redprint import Redprint
- from flask import jsonify, request
- from app.app import mongo
- from app.app import jwt
- from bson.json_util import dumps
- from bson.objectid import ObjectId
- from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
- import bcrypt
- from pymongo.errors import DuplicateKeyError
- from werkzeug.exceptions import BadRequest
-
- # 初始化红图
- api = Redprint('user')
-
-
- @api.route('/get')
- def get_user2():
- return "I am user get method"
-
-
- def check_password(username, password):
- # 找到用户
- user = mongo.db.users.find_one({"username": username})
- if user is None:
- # 返回一个更泛化的错误信息
- return {"error": "此用户名不存在."}, 404
- # 验证密码
- if bcrypt.checkpw(password.encode('utf-8'), user['password_hash']):
- return {"message": "Password correct"}, 200
- else:
- return {"error": "密码错误."}, 401
-
-
- def make_jwt_info(user):
- # 使用flask_jwt_extended的create_access_token函数创建JWT
- access_token = create_access_token(identity=str(user['_id']), additional_claims={
- 'email': user['email'],
- 'username': user['username'],
- 'avatar_url': user['avatar_url'],
- 'bio': user['bio']
- })
- return access_token
-
-
- @api.route('/pwdlogin', methods=['POST'])
- def user_pwd_login():
- username = request.json.get('username')
- password = request.json.get('password')
- result, status_code = check_password(username, password) # 解包元组
- # 根据check_password的返回结果处理
- if result.get('message') == "Password correct":
- # access_token = create_access_token(identity=username)
- user = mongo.db.users.find_one({"username": username})
- access_token = make_jwt_info(user)
- return jsonify({'token': access_token}), 200
- else:
- return jsonify(result), status_code # 使用从check_password返回的状态码
-
-
- def InitUserInfo(user_info):
- user_info["avatar_url"] = "https://afanai.top:8088/imgs/default_avatar_1.jpeg"
- user_info["bio"] = "永远不要降低心灵飞翔的高度"
- user_info["password_hash"] = bcrypt.hashpw(user_info["password"].encode('utf-8'), bcrypt.gensalt())
-
-
- @api.route('/register', methods=['POST'])
- def user_pwd_register():
- user_info = request.get_json()
- InitUserInfo(user_info)
- try:
- mongo.db.users.insert_one(user_info)
- username = user_info["username"]
- return jsonify({"message": f"User {username} registered successfully."}), 201
- except DuplicateKeyError as e:
- # 处理DuplicateKeyError
- # 从异常信息中解析出冲突的字段
- error_message = str(e)
- if 'username' in error_message:
- return jsonify({"error": "此用户名已存在."}), 409
- if 'email' in error_message:
- return jsonify({"error": "此邮箱已被注册."}), 409
- # 如果异常信息中没有明确的字段信息,可以考虑更详细的异常处理或日志记录
- return jsonify({"error": "未知错误."}), 500
-
-
- @api.route('/getall')
- def get_all_user():
- user_infos = list(mongo.db.users.find())
- # 将ObjectId转换为字符串,以便于JSON序列化
- # 使用bson的json_util来序列化包含ObjectId的对象
- # json_string = dumps(user_infos)
- for user_info in user_infos:
- user_info["_id"] = str(user_info["_id"])
- user_info["password_hash"] = "******"
- return jsonify(user_infos)
-
-
- @api.route('/modify/<user_id>', methods=['PUT'])
- def modify_specify_user(user_id):
- try:
- item = request.get_json()
- # print(item)
- if not item:
- raise BadRequest("请求体为空")
- # 将user_id转换为ObjectId
- user_id_obj = ObjectId(user_id)
- result = mongo.db.users.update_one({"_id": user_id_obj}, {"$set": item})
- # 如果没有找到或修改任何文档
- if result.modified_count == 0:
- return jsonify({"error": "未找到用户或没有数据被修改"}), 404
-
- user = mongo.db.users.find_one({"_id": user_id_obj})
- access_token = make_jwt_info(user)
- return jsonify({'token': access_token}), 201
- except BadRequest as e:
- return jsonify({"error": str(e)}), 400
- except Exception as e:
- return jsonify({"error": "内部服务器错误"}), 500
-
-
- @api.route('/search', methods=['GET'])
- def search_specify_user():
- try:
- # 从查询字符串中获取参数
- query = request.args.get('query')
- print(query)
- if not query:
- raise BadRequest("查询参数为空")
-
- # 搜索用户名
- user = mongo.db.users.find_one({"username": query})
- # print(user)
- queryUser = {}
- queryRes = []
- if user:
- queryUser["userId"] = str(user["_id"])
- queryUser["username"] = user["username"]
- queryUser["avatar_url"] = user["avatar_url"]
- queryUser["bio"] = user["bio"]
- queryRes.append(queryUser)
- print(queryRes)
-
- if len(queryRes):
- return jsonify({'queryRes': queryRes}), 200
- else:
- return jsonify({'queryRes': "None Result"}), 404
-
- except BadRequest as e:
- return jsonify({"error": str(e)}), 400
- except Exception as e:
- return jsonify({"error": "内部服务器错误"}), 500
-
-
- # @api.route('/items', methods=['POST'])
- # def add_item():
- # item = request.get_json()
- # result = db.items.insert_one(item)
- # return jsonify({"_id": str(result.inserted_id)})
-
- # @api.route('/items/<item_id>', methods=['PUT'])
- # def update_item(item_id):
- # item = request.get_json()
- # result = db.items.update_one({"_id": ObjectId(item_id)}, {"$set": item})
- # return jsonify({"modified_count": result.modified_count})
-
- # @api.route('/items/<item_id>', methods=['DELETE'])
- # def delete_item(item_id):
- # result = db.items.delete_one({"_id": ObjectId(item_id)})
- # return jsonify({"deleted_count": result.deleted_count})
-
-
- # @app.route('/login', methods=['GET', 'POST'])
- # def login_page():
- # if request.method == 'POST':
- # username = request.form['username']
- # password = request.form['password']
- # # 在此处验证用户凭据
- # user = User()
- # user.id = username
- # login_user(user)
- # return redirect(url_for('protected_page'))
- # return render_template('login.html')
-
- # @app.route('/logout')
- # def logout_page():
- # if current_user.is_active:
- # logout_user()
- # return 'Logged out'
- # else:
- # return "you aren't login"
|