Commit eac907a7 by Aeolus

update

parent fd44bcc2
...@@ -56,5 +56,5 @@ AGENT_STATUS = { ...@@ -56,5 +56,5 @@ AGENT_STATUS = {
ACCOUNT_STATUS = { ACCOUNT_STATUS = {
'on_use': 1, 'on_use': 1,
'delete': 2 'delete': -1
} }
...@@ -152,14 +152,15 @@ def get_account_list(): ...@@ -152,14 +152,15 @@ def get_account_list():
keyword = json_data.get("keyword", None) keyword = json_data.get("keyword", None)
select_sql = """select admin_account.user_name, admin_account.phone, admin_account.level, admin_account.status, select_sql = """select admin_account.user_name, admin_account.phone, admin_account.level, admin_account.status,
admin_account.comment,admin_account.parent_id,admin_account.rate, admin_account.created_at, admin_account.updated_at, admin_account.last_login, admin_account.comment,admin_account.parent_id,admin_account.rate, admin_account.created_at,
admin_account.id admin_account.updated_at,admin_account.id, admin_account.user_no
""" """
count_sql = "select count(admin_account.id) as total_count" count_sql = "select count(admin_account.id) as total_count"
from_sql = " from admin_account where admin_account.id in ( select admin_account.id " from_sql = " from admin_account where admin_account.id in ( select admin_account.id "
from_sql += " from admin_account " from_sql += " from admin_account "
where_sql = " where 0=0 and admin_account.level < {} and admin_account.parent = {}".format(g.user.level, g.user.id) where_sql = " where 0=0 and admin_account.level > {} and admin_account.parent_id = {}".format(g.user.level,
g.user.id)
if keyword: if keyword:
where_sql += """ where_sql += """
and CONCAT(admin_account.user_name,admin_account.phone) LIKE '%{keyword}%' and CONCAT(admin_account.user_name,admin_account.phone) LIKE '%{keyword}%'
...@@ -176,15 +177,16 @@ def get_account_list(): ...@@ -176,15 +177,16 @@ def get_account_list():
else: else:
total_count = count_result.total_count total_count = count_result.total_count
print(select_sql + from_sql + where_sql + order_sql + limit_sql)
result = db.session.execute(select_sql + from_sql + where_sql + order_sql + limit_sql).fetchall() result = db.session.execute(select_sql + from_sql + where_sql + order_sql + limit_sql).fetchall()
return_data = [] return_data = []
for info in result: for info in result:
return_data.append( return_data.append(
{"user_name": info.user_name, "phone": info.phone, "level": info.level, "status": info.status, {"user_name": info.user_name, "phone": info.phone, "level": info.level, "status": info.status,
"comment": info.comment, "admin_id": info.id, "comment": info.comment, "user_id": info.id, "user_no": info.user_no,
"create_time": info.created_at.strftime("%Y-%m-%d %H:%M:%S") if info.last_login else "", "create_time": info.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"update_time": info.updated_at.strftime("%Y-%m-%d %H:%M:%S") if info.last_login else "", "update_time": info.updated_at.strftime("%Y-%m-%d %H:%M:%S"),
}) })
return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count}) return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count})
...@@ -198,13 +200,14 @@ def get_account_detail(): ...@@ -198,13 +200,14 @@ def get_account_detail():
admin_info = g.user admin_info = g.user
else: else:
admin_info = AdminAccount.query.filter(AdminAccount.phone == phone, admin_info = AdminAccount.query.filter(AdminAccount.phone == phone,
AdminAccount.level < g.user.level, AdminAccount.level > g.user.level,
AdminAccount.parent_id == g.user.id).first() AdminAccount.parent_id == g.user.id).first()
if not admin_info: if not admin_info:
return BaseResponse(**ACCOUNT_NOT_EXISTS_ERROR) return BaseResponse(**ACCOUNT_NOT_EXISTS_ERROR)
user_info = { user_info = {
"admin_no": admin_info.admin_no, "user_id": admin_info.id,
"user_no": admin_info.user_no,
"user_name": admin_info.user_name, "user_name": admin_info.user_name,
"phone": admin_info.phone, "phone": admin_info.phone,
"level": admin_info.level, "level": admin_info.level,
...@@ -220,7 +223,7 @@ def edit_user(): ...@@ -220,7 +223,7 @@ def edit_user():
json_data = request.get_json() json_data = request.get_json()
old_phone = json_data['old_phone'] if 'old_phone' in json_data else '' old_phone = json_data['old_phone'] if 'old_phone' in json_data else ''
new_phone = json_data['new_phone'] if 'new_phone' in json_data else '' new_phone = json_data['new_phone'] if 'new_phone' in json_data else ''
user_name = json_data['name'] if 'name' in json_data else 'SSW' user_name = json_data['user_name'] if 'user_name' in json_data else 'SSW'
password = json_data['password'] if 'password' in json_data else '' password = json_data['password'] if 'password' in json_data else ''
comment = json_data['comment'] if 'comment' in json_data else '' comment = json_data['comment'] if 'comment' in json_data else ''
level = json_data['level'] if 'level' in json_data else '' level = json_data['level'] if 'level' in json_data else ''
...@@ -240,7 +243,7 @@ def edit_user(): ...@@ -240,7 +243,7 @@ def edit_user():
admin_info.level = int(level) admin_info.level = int(level)
else: else:
admin_info = AdminAccount.query.filter(AdminAccount.phone == old_phone, admin_info = AdminAccount.query.filter(AdminAccount.phone == old_phone,
AdminAccount.level < g.user.level, AdminAccount.level > g.user.level,
AdminAccount.parent_id == g.user.id AdminAccount.parent_id == g.user.id
).first() ).first()
if not admin_info: if not admin_info:
...@@ -267,7 +270,7 @@ def delete_user(): ...@@ -267,7 +270,7 @@ def delete_user():
return BaseResponse(**PHONE_NOT_NULL_ERROR) return BaseResponse(**PHONE_NOT_NULL_ERROR)
admin_info = AdminAccount.query.filter(AdminAccount.phone == phone, admin_info = AdminAccount.query.filter(AdminAccount.phone == phone,
AdminAccount.level < g.user.level).first() AdminAccount.level > g.user.level).first()
if not admin_info: if not admin_info:
return BaseResponse(**ACCOUNT_NOT_EXISTS_ERROR) return BaseResponse(**ACCOUNT_NOT_EXISTS_ERROR)
......
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), url_for("admin.send_code"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers)) #!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file \ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment