Commit 09261a14 by Aeolus

update

parent 1c904553
......@@ -58,3 +58,8 @@ ACCOUNT_STATUS = {
'on_use': 1,
'delete': -1
}
DISCOUNTS_TYPES = {
0: "暂无折扣",
1: "每日收单免费",
}
......@@ -145,6 +145,7 @@ class Machine(Base):
created_at = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
command_time = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
discounts_id = Column(INTEGER(11), nullable=False, server_default=text("'0'"), comment="折扣活动")
class MachineProduction(Base):
......
......@@ -22,7 +22,7 @@ def create_app(config_name):
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
set_logger()
app = Flask("sukang24h")
app = Flask("automat")
from config.app_config import config
app.config.from_object(config[config_name])
CORS(app)
......@@ -40,6 +40,6 @@ def create_app(config_name):
app.after_request(close_db_session)
# todo register blueprint
from myapps.sukang24h.api import register_sukang_blueprint
from myapps.automat.api import register_sukang_blueprint
register_sukang_blueprint(app)
return app
......@@ -7,17 +7,17 @@ author:Aeolus
"""
from flask import Flask
from myapps.sukang24h.api.wx_auth_portal import wx_auth_route
from myapps.sukang24h.api.hatch_portal import hatch_route
from myapps.sukang24h.api.rent_portal import rent_route
from myapps.sukang24h.api.tallyman_portal import tallyman_route
from myapps.sukang24h.api.machine_portal import machine_route
from myapps.sukang24h.api.nfc_card_portal import nfc_card_route
from myapps.sukang24h.api.card_edit_portal import card_edit_route
from myapps.automat.api.wx_auth_portal import wx_auth_route
from myapps.automat.api.hatch_portal import hatch_route
from myapps.automat.api.rent_portal import rent_route
from myapps.automat.api.tallyman_portal import tallyman_route
from myapps.automat.api.machine_portal import machine_route
from myapps.automat.api.nfc_card_portal import nfc_card_route
from myapps.automat.api.card_edit_portal import card_edit_route
def register_sukang_blueprint(app: Flask):
prefix = "/sukang"
prefix = "/automat"
app.register_blueprint(wx_auth_route, url_prefix=prefix + "/wx_auth")
app.register_blueprint(hatch_route, url_prefix=prefix + "/hatch")
app.register_blueprint(rent_route, url_prefix=prefix + "/rent")
......
......@@ -15,7 +15,7 @@ from functools import reduce
from flask import Blueprint, request, jsonify, g
from sqlalchemy import func
from config.commen_config import USER_RENT_PREPAY_ID, RENT_SALE_LOCK
from config.commen_config import USER_RENT_PREPAY_ID, RENT_SALE_LOCK, DISCOUNTS_TYPES
from config.wechat_config import platform_appid_config_list, pay_config_list
from models.base_model import db
from models.models import Machine, Hatch, Rent, WxUser, RentDetail
......@@ -31,6 +31,28 @@ logger = logging.getLogger(__name__)
rent_route = Blueprint('rent', __name__)
@rent_route.route("discounts_info", methods=["POST"])
def get_discount_info():
json_data = request.get_json()
machine_no = json_data["machine_no"]
user = g.user
# 验证机柜是否存在
machine = Machine.query.filter_by(machine_no=machine_no).first()
if not machine:
return jsonify(MACHINE_NOT_EXIST_ERROR)
if machine.discounts_id == 1:
rent = Rent.query.filter(db.cast(Rent.created_at, db.DATE) == db.cast(datetime.datetime.now(), db.DATE)).first()
if rent:
is_used = 1
else:
is_used = 0
return_data = {"discounts_id": machine.discounts_id,
"discounts_name": DISCOUNTS_TYPES[int(machine.discounts_id)],
"is_used": is_used}
return BaseResponse(data=return_data)
@rent_route.route("rent", methods=["POST"])
def create_rent():
json_data = request.get_json()
......
......@@ -12,6 +12,7 @@ import logging
from flask import Blueprint, g, request, jsonify
from config.commen_config import DISCOUNTS_TYPES
from models.base_model import db
from models.models import AdminMachine, Machine
from utils.error_code import MACHINE_NOT_EXIST_ERROR
......@@ -36,7 +37,7 @@ def run_machine_list():
admin = g.user
select_sql = """select machine.id, machine.machine_no, machine.device_id, machine.qrcode_no,machine.status,
machine.mac, machine.power, machine.hatch_number, machine.type,machine.place_id,
place.place_name
place.place_name, machine.discounts_id
"""
count_sql = "select count(machine.id) as total_count"
from_sql = """ from machine left join place on machine.place_id = place.id
......@@ -67,7 +68,9 @@ def run_machine_list():
return_data.append(
{"machine_no": info.machine_no, "device_id": info.device_id, "place_name": info.place_name,
"mac": info.mac, "power": info.power, "hatch_number": info.hatch_number, "type": info.type,
"status": info.status, "place_id": info.place_id
"status": info.status, "place_id": info.place_id,
"discounts_id": info.discounts_id,
"discounts_name": DISCOUNTS_TYPES[int(info.discounts_id)],
})
return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count})
......@@ -89,6 +92,7 @@ def run_add_machine():
hatch_number = json_data.get("hatch_number", None)
place_id = json_data.get("place_id", None)
type = json_data.get("type", 1)
discounts_id = json_data.get("type", 0)
machine_model = Machine()
machine_model.machine_no = machine_no
......@@ -101,6 +105,7 @@ def run_add_machine():
machine_model.mch_platform = 1
machine_model.hatch_number = hatch_number
machine_model.type = type
machine_model.discounts_id = discounts_id
db.session.add(machine_model)
admin_machine = AdminMachine()
......@@ -130,6 +135,7 @@ def run_edit_machine():
place_id = json_data.get("place_id", None)
type = json_data.get("type", None)
status = json_data.get("status", None)
discounts_id = json_data.get("discounts_id", None)
machine_model = Machine.query.filter_by(machine_no=machine_no).first()
if not machine_model:
......@@ -153,6 +159,8 @@ def run_edit_machine():
machine_model.type = type
if status:
machine_model.status = status
if discounts_id:
machine_model.discounts_id = discounts_id
db.session.add(machine_model)
db.session.commit()
......@@ -171,7 +179,7 @@ def get_machine_detail():
admin = g.user
select_sql = """select machine.id, machine.machine_no, machine.device_id, machine.qrcode_no,machine.status,
machine.mac, machine.power, machine.hatch_number, machine.type,machine.place_id,
place.place_name
place.place_name, machine.discounts_id
"""
from_sql = """ from machine left join place on machine.place_id = place.id
where machine.machine_no in ( select machine_no from admin_machine where
......@@ -187,6 +195,8 @@ def get_machine_detail():
info = result[0]
return BaseResponse(data={"machine_no": info.machine_no, "device_id": info.device_id, "place_name": info.place_name,
"mac": info.mac, "power": info.power, "hatch_number": info.hatch_number,
"type": info.type,
"status": info.status, "place_id": info.place_id
"type": info.type, "status": info.status, "place_id": info.place_id,
"discounts_id": info.discounts_id,
"discounts_name": DISCOUNTS_TYPES[int(info.discounts_id)],
})
......@@ -6,7 +6,7 @@
"""
import os
import logging
from myapps.sukang24h import create_app
from myapps.automat import create_app
logger = logging.getLogger(__name__)
app = create_app(os.getenv('RUN_ENV', 'prod'))
......
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) # token = generate_jwt({"user_no": 'SK000007'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 4}, time.time() + 6000, SECRET_KEY) # token = generate_jwt({"user_no": 'SK000007'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file
......
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "automat": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment