Commit 92264414 by Aeolus

update

parent 06dfc370
......@@ -119,3 +119,18 @@ def run_create_machine_no():
db.session.add(machine_model)
db.session.commit()
return BaseResponse(data={"machine_no": machine_no})
@machine_route.route('bind_serial_num', methods=["post"])
def run_bind_serial_num():
json_data = request.get_json()
machine_no = json_data["machine_no"]
serial_num = json_data["serial_num"]
machine = Machine.query.filter_by(machine_no=machine_no).first()
if not machine:
return jsonify(MACHINE_NOT_EXIST_ERROR)
machine.device_id = serial_num
db.session.add(machine)
db.session.commit()
return BaseResponse(data={"machine_no": machine_no, "serial_num": serial_num})
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "automat": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('machine.run_create_machine_no'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "automat": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('machine.run_create_machine_no'), url_for('machine.run_bind_serial_num'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment