Commit 33c765f0 by Aeolus

update

parent d8c89673
...@@ -33,3 +33,10 @@ WX_SESSION_KEY = "W_S_K_" ...@@ -33,3 +33,10 @@ WX_SESSION_KEY = "W_S_K_"
WX_ACCESS_TOKEN = "W_A_T_" WX_ACCESS_TOKEN = "W_A_T_"
# 微信公众号access_token过期时间 # 微信公众号access_token过期时间
WX_ACCESS_TOKEN_EXPIRE = "W_A_T_E_" WX_ACCESS_TOKEN_EXPIRE = "W_A_T_E_"
LOGIN_TYPE = {
'code_login': 1,
'token_login': 2,
'send_code': 3,
'password': 4
}
# coding: utf-8 # coding: utf-8
from sqlalchemy import Column, DateTime, Index, String, TIMESTAMP, Text, text from sqlalchemy import Column, DateTime, Index, String, TIMESTAMP, Text, text, FetchedValue
from sqlalchemy.dialects.mysql import INTEGER, TINYINT, VARCHAR from sqlalchemy.dialects.mysql import INTEGER, TINYINT, VARCHAR
from werkzeug.security import generate_password_hash, check_password_hash
from models.base_model import Base from models.base_model import Base
...@@ -213,28 +214,52 @@ class TallymanAccount(Base): ...@@ -213,28 +214,52 @@ class TallymanAccount(Base):
phone = Column(String(191, 'utf8mb4_unicode_ci'), nullable=False, unique=True) phone = Column(String(191, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
level = Column(INTEGER(1), nullable=False, comment='1:补货员') level = Column(INTEGER(1), nullable=False, comment='1:补货员')
status = Column(INTEGER(1), nullable=False, comment='1:正常 2:删除') status = Column(INTEGER(1), nullable=False, comment='1:正常 2:删除')
password = Column(String(255, 'utf8mb4_unicode_ci')) _password_hash_ = Column(String(255, 'utf8mb4_unicode_ci'))
comment = Column(String(255, 'utf8mb4_unicode_ci')) comment = Column(String(255, 'utf8mb4_unicode_ci'))
last_login = Column(DateTime) last_login = Column(DateTime)
expire_time = Column(DateTime) expire_time = Column(DateTime)
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP")) created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")) updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
@property
def password(self):
raise Exception('密码不能被读取') # 为了保持使用习惯,还是设置一个password字段用来设置密码,当然也不能被读取。
# 赋值password,则自动加密存储。
@password.setter
def password(self, value):
self._password_hash_ = generate_password_hash(value)
# 使用check_password,进行密码校验,返回True False。
def check_password(self, pasword):
return check_password_hash(self._password_hash_, pasword)
class TallymanMachine(Base): class TallymanMachine(Base):
__tablename__ = 'tallyman_machine' __tablename__ = 'tallyman_machine'
__table_args__ = ( __table_args__ = (
Index('index4tallymachine_user_machine_unique', 'user_id', 'machine_id', unique=True), Index('index4tallymachine_user_machine_unique', 'user_id', 'machine_no', unique=True),
) )
id = Column(INTEGER(10), primary_key=True) id = Column(INTEGER(10), primary_key=True)
user_id = Column(INTEGER(10), nullable=False, index=True) user_id = Column(INTEGER(10), nullable=False, index=True)
machine_id = Column(INTEGER(10), nullable=False, index=True) machine_no = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='机柜id')
status = Column(INTEGER(1), nullable=False, index=True, comment='1:正常 -1:删除') status = Column(INTEGER(1), nullable=False, index=True, comment='1:正常 -1:删除')
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP")) created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")) updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class TallymanLoginRecord(Base):
__tablename__ = 'tallyman_login_record'
id = Column(INTEGER, primary_key=True)
phone = Column(String(255, 'utf8mb4_bin'), nullable=False, server_default=FetchedValue())
ip = Column(String(255, 'utf8mb4_bin'), nullable=False, server_default=FetchedValue())
platform = Column(INTEGER, nullable=False, server_default=FetchedValue(), comment='平台 2小导游 8商户PC')
last_login = Column(DateTime, nullable=False, server_default=FetchedValue())
login_type = Column(INTEGER, nullable=False, server_default=FetchedValue(), comment='1:验证码登录 2:token 3:发送验证码 4:密码')
class WxUser(Base): class WxUser(Base):
__tablename__ = 'wx_user' __tablename__ = 'wx_user'
__table_args__ = {'comment': '微信用户表'} __table_args__ = {'comment': '微信用户表'}
......
...@@ -10,6 +10,7 @@ from flask import Flask ...@@ -10,6 +10,7 @@ from flask import Flask
from myapps.sukang24h.api.wx_auth_portal import wx_auth_route from myapps.sukang24h.api.wx_auth_portal import wx_auth_route
from myapps.sukang24h.api.hatch_portal import hatch_route from myapps.sukang24h.api.hatch_portal import hatch_route
from myapps.sukang24h.api.rent_portal import rent_route from myapps.sukang24h.api.rent_portal import rent_route
from myapps.sukang24h.api.tallyman_portal import tallyman_route
def register_sukang_blueprint(app: Flask): def register_sukang_blueprint(app: Flask):
...@@ -17,3 +18,4 @@ def register_sukang_blueprint(app: Flask): ...@@ -17,3 +18,4 @@ def register_sukang_blueprint(app: Flask):
app.register_blueprint(wx_auth_route, url_prefix=prefix + "/wx_auth") app.register_blueprint(wx_auth_route, url_prefix=prefix + "/wx_auth")
app.register_blueprint(hatch_route, url_prefix=prefix + "/hatch") app.register_blueprint(hatch_route, url_prefix=prefix + "/hatch")
app.register_blueprint(rent_route, url_prefix=prefix + "/rent") app.register_blueprint(rent_route, url_prefix=prefix + "/rent")
app.register_blueprint(tallyman_route, url_prefix=prefix + "/tallyman")
...@@ -22,7 +22,7 @@ from models.models import Machine, Hatch, Rent, WxUser, RentDetail ...@@ -22,7 +22,7 @@ from models.models import Machine, Hatch, Rent, WxUser, RentDetail
from service.rent_service import RentService from service.rent_service import RentService
from service.wechat_service import WeChatPayService from service.wechat_service import WeChatPayService
from utils.error_code import Param_Invalid_Error, MACHINE_NOT_EXIST_ERROR, HATCH_NOT_EXIST_ERROR, \ from utils.error_code import Param_Invalid_Error, MACHINE_NOT_EXIST_ERROR, HATCH_NOT_EXIST_ERROR, \
HATCH_NOT_ALL_EXIST_ERROR, WE_MINIAPP_PAY_FAIL, NO_RENT_RECORD HATCH_NOT_ALL_EXIST_ERROR, WE_MINIAPP_PAY_FAIL, NO_RENT_RECORD, HATCH_COUNT_ERROR
from utils.my_redis_cache import redis_client from utils.my_redis_cache import redis_client
from utils.my_response import BaseResponse from utils.my_response import BaseResponse
...@@ -53,7 +53,7 @@ def create_rent(): ...@@ -53,7 +53,7 @@ def create_rent():
return jsonify(HATCH_NOT_EXIST_ERROR) return jsonify(HATCH_NOT_EXIST_ERROR)
if len(hatch_list) < int(count): if len(hatch_list) < int(count):
return jsonify(HATCH_NOT_ALL_EXIST_ERROR) return jsonify(HATCH_COUNT_ERROR)
total_fee += hatch_list[0].price * int(count) total_fee += hatch_list[0].price * int(count)
......
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: tally_portal.py
"""
import logging
from flask import Blueprint, request
logger = logging.getLogger(__name__)
tally_route = Blueprint('tally', __name__)
@tally_route.route("/login", methods = ["POST"])
def run_tally_login():
json_data = request.get_json()
user_name = json_data["user_name"]
password = json_data["password"]
# -*- coding: utf-8 -*-
import logging
from models.models import TallymanMachine, Machine, Hatch
logger = logging.getLogger(__name__)
class TallymanService(object):
@classmethod
def get_machine_info(cls, tallyman):
machine_infos = Machine.query(Machine).join(TallymanMachine,
TallymanMachine.machine_no == Machine.machine_no).filter(
user_id=tallyman.id, status=1).all()
return_data = []
for tmp_machine in machine_infos:
cur_machine = {}
cur_machine['machine_no'] = tmp_machine.machine_no
cur_machine['short_address'] = tmp_machine.short_address
cur_machine['address'] = tmp_machine.address
cur_machine['place_id'] = tmp_machine.place_id
cur_machine['empty_number'] = Hatch.query.filter(Hatch.machine_no == tmp_machine.machine_no,
Hatch.status == 2).count()
return_data.append(cur_machine)
return return_data
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1) #!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) # token = generate_jwt({"user_no": 'SK000001'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file \ No newline at end of file
......
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), ] else: NO_AUTH_CHECK_URL = [] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers)) #!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ print(request.path) path_list = request.path.split("/") if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file \ No newline at end of file
......
# -*- coding: utf-8 -*-
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment