Commit deb0e035 by Aeolus

update

parent 5ccaa448
......@@ -78,6 +78,10 @@ target/
# Media
*.jpg
*.png
*.pdf
*.jpeg
*gif
#Document
*.xls
......
......@@ -23,6 +23,7 @@ class Config:
SQLALCHEMY_POOL_RECYCLE = 1800
JWT_SECRET = SECRET_KEY
TENCENT_REDIS_URL = os.getenv("TENCENT_REDIS_URL")
MAX_CONTENT_LENGTH = 16 * 1024 * 1024
@staticmethod
def init_app(app):
......
......@@ -37,6 +37,18 @@ class AdminAccount(Base):
return check_password_hash(self._password_hash_, pasword)
class AdminBrand(Base):
__tablename__ = 'admin_brand'
id = Column(INTEGER(11), primary_key=True)
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
brand_id = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class AdminLoginRecord(Base):
__tablename__ = 'admin_login_record'
......@@ -74,21 +86,37 @@ class AdminPlace(Base):
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Brand(Base):
__tablename__ = 'brand'
class AdminProduction(Base):
__tablename__ = 'admin_production'
id = Column(INTEGER(10), primary_key=True)
brand_name = Column(VARCHAR(191), nullable=False, index=True, comment='品牌名')
logo = Column(VARCHAR(191), nullable=False, comment='logo')
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
id = Column(INTEGER(11), primary_key=True)
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
production_id = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class AdminProductionType(Base):
__tablename__ = 'admin_production_type'
id = Column(INTEGER(11), primary_key=True)
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
production_type_id = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Cate(Base):
__tablename__ = 'cate'
class Brand(Base):
__tablename__ = 'brand'
id = Column(INTEGER(10), primary_key=True)
cate_name = Column(VARCHAR(191), nullable=False, index=True, comment='分类名')
brand_name = Column(VARCHAR(191), nullable=False, index=True, comment='品牌名')
img = Column(VARCHAR(191), nullable=False, comment='logo')
status = Column(TINYINT(1), default=text("'1'"), comment='状态: 1正常-1删除')
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
......@@ -228,17 +256,34 @@ class Production(Base):
__tablename__ = 'production'
id = Column(INTEGER(10), primary_key=True)
name = Column(String(100, 'utf8mb4_unicode_ci'), nullable=False, index=True, comment='商品名称')
production_no = Column(String(45, 'utf8mb4_unicode_ci'), nullable=False)
production_name = Column(String(100, 'utf8mb4_unicode_ci'), nullable=False, index=True, comment='商品名称')
title = Column(String(200, 'utf8mb4_unicode_ci'), nullable=False, comment='商品标题')
brand_id = Column(INTEGER(10), nullable=False, comment='品牌ID')
cate_id = Column(INTEGER(10), nullable=False, comment='分类ID')
production_type_id = Column(INTEGER(10), nullable=False, comment='分类ID')
weight = Column(INTEGER(10), server_default=text("'0'"))
weight_unit = Column(String(10, 'utf8mb4_unicode_ci'), server_default=text("'g'"))
expiration_date = Column(INTEGER(10), server_default=text("'0'"))
expiration_date_unit = Column(String(10, 'utf8mb4_unicode_ci'), server_default=text("'月'"))
is_expiration_date = Column(TINYINT(1), server_default=text("'0'"))
weight_error = Column(INTEGER(10), server_default=text("'0'"))
price = Column(INTEGER(10), nullable=False, comment='价格')
original_price = Column(INTEGER(10), nullable=False, comment='商品原价')
img = Column(String(200, 'utf8mb4_unicode_ci'))
tags = Column(String(255, 'utf8mb4_unicode_ci'), comment='商品标签')
content = Column(Text(collation='utf8mb4_unicode_ci'), comment='商品内容')
summary = Column(Text(collation='utf8mb4_unicode_ci'), comment='商品描述')
status = Column(TINYINT(1), comment='状态: 1正常-1删除')
status = Column(TINYINT(1), default=text("'1'"), comment='状态: 1正常-1删除')
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class ProductionType(Base):
__tablename__ = 'production_type'
id = Column(INTEGER(10), primary_key=True)
production_type_name = Column(VARCHAR(191), nullable=False, index=True)
status = Column(TINYINT(1), default=text("'1'"), comment='状态: 1正常-1删除')
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
......
......@@ -14,14 +14,13 @@ class AdminAccount(Base):
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
user_name = Column(String(255, 'utf8mb4_unicode_ci'), nullable=False)
phone = Column(String(191, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
level = Column(INTEGER(1), nullable=False)
level = Column(TINYINT(2), nullable=False)
parent_id = Column(INTEGER(10), nullable=False)
draw = Column(TINYINT(1), nullable=False, server_default=text("'0'"))
rate = Column(INTEGER(10), nullable=False)
status = Column(INTEGER(1), nullable=False)
_password_hash_ = Column(String(255, 'utf8mb4_unicode_ci'))
comment = Column(String(255, 'utf8mb4_unicode_ci'))
last_login = Column(DateTime)
expire_time = Column(DateTime)
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
......@@ -46,7 +45,7 @@ class AdminMachine(Base):
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
machine_no = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False)
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
......@@ -58,30 +57,31 @@ class AdminPlace(Base):
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
place_id = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False)
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Brand(Base):
__tablename__ = 'brand'
class AdminProduction(Base):
__tablename__ = 'admin_production'
id = Column(INTEGER(10), primary_key=True)
brand_name = Column(VARCHAR(191), nullable=False, index=True)
logo = Column(VARCHAR(191), nullable=False)
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
id = Column(INTEGER(11), primary_key=True)
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
production_id = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Cate(Base):
__tablename__ = 'cate'
class Brand(Base):
__tablename__ = 'brand'
id = Column(INTEGER(10), primary_key=True)
brand_id = Column(INTEGER(10), nullable=False)
cate_name = Column(VARCHAR(191), nullable=False, index=True)
name = Column(VARCHAR(191), nullable=False, index=True)
img = Column(VARCHAR(191), nullable=False)
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
catecol = Column(String(45, 'utf8mb4_unicode_ci'))
class Hatch(Base):
......@@ -117,14 +117,14 @@ class Machine(Base):
__tablename__ = 'machine'
id = Column(INTEGER(10), primary_key=True)
machine_no = Column(String(20, 'utf8mb4_unicode_ci'), unique=True)
machine_no = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
device_id = Column(String(45, 'utf8mb4_unicode_ci'), unique=True)
qrcode_no = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
mac = Column(String(30, 'utf8mb4_unicode_ci'))
qrcode_no = Column(String(20, 'utf8mb4_unicode_ci'), unique=True)
mac = Column(String(30, 'utf8mb4_unicode_ci'), unique=True)
power = Column(TINYINT(3), nullable=False, server_default=text("'0'"))
short_address = Column(VARCHAR(45))
address = Column(String(191, 'utf8mb4_unicode_ci'))
place_id = Column(INTEGER(10), nullable=False)
place_id = Column(INTEGER(10))
mch_platform = Column(INTEGER(11), nullable=False, server_default=text("'1'"))
position = Column(String(20, 'utf8mb4_unicode_ci'))
hatch_number = Column(TINYINT(3), nullable=False, server_default=text("'0'"))
......@@ -133,6 +133,7 @@ class Machine(Base):
created_at = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
command_time = Column(INTEGER(4), nullable=False, server_default=text("'1'"))
discounts_id = Column(INTEGER(10), server_default=text("'0'"))
class MachineProduction(Base):
......@@ -236,14 +237,16 @@ class Place(Base):
__tablename__ = 'place'
id = Column(INTEGER(10), primary_key=True)
parent_id = Column(INTEGER(10))
place_name = Column(VARCHAR(191), nullable=False, index=True)
img = Column(VARCHAR(191))
logo = Column(VARCHAR(191), nullable=False)
address = Column(VARCHAR(255), nullable=False, server_default=text("''"))
logo = Column(VARCHAR(191))
address = Column(VARCHAR(255), server_default=text("''"))
position = Column(String(20, 'utf8mb4_unicode_ci'))
open_time = Column(VARCHAR(191), nullable=False)
close_time = Column(VARCHAR(191), nullable=False)
open_week = Column(VARCHAR(255), nullable=False, server_default=text("''"))
open_time = Column(VARCHAR(191))
close_time = Column(VARCHAR(191))
open_week = Column(VARCHAR(255), server_default=text("''"))
status = Column(INTEGER(1), nullable=False, server_default=text("'1'"))
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
......@@ -263,12 +266,19 @@ class Production(Base):
__tablename__ = 'production'
id = Column(INTEGER(10), primary_key=True)
name = Column(String(100, 'utf8mb4_unicode_ci'), nullable=False, index=True)
production_no = Column(String(45, 'utf8mb4_unicode_ci'), nullable=False)
production_name = Column(String(100, 'utf8mb4_unicode_ci'), nullable=False, index=True)
title = Column(String(200, 'utf8mb4_unicode_ci'), nullable=False)
brand_id = Column(INTEGER(10), nullable=False)
cate_id = Column(INTEGER(10), nullable=False)
production_type_id = Column(INTEGER(10), nullable=False)
price = Column(INTEGER(10), nullable=False)
original_price = Column(INTEGER(10), nullable=False)
weight = Column(INTEGER(10), server_default=text("'0'"))
weight_unit = Column(String(10, 'utf8mb4_unicode_ci'), server_default=text("'g'"))
expiration_date = Column(INTEGER(10), server_default=text("'0'"))
expiration_date_unit = Column(String(10, 'utf8mb4_unicode_ci'), server_default=text("''"))
is_expiration_date = Column(TINYINT(1), server_default=text("'0'"))
weight_error = Column(INTEGER(10), server_default=text("'0'"))
img = Column(String(200, 'utf8mb4_unicode_ci'))
tags = Column(String(255, 'utf8mb4_unicode_ci'))
content = Column(Text(collation='utf8mb4_unicode_ci'))
......@@ -278,6 +288,16 @@ class Production(Base):
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class ProductionType(Base):
__tablename__ = 'production_type'
id = Column(INTEGER(10), primary_key=True)
name = Column(VARCHAR(191), nullable=False, index=True)
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
catecol = Column(String(45, 'utf8mb4_unicode_ci'))
class Rent(Base):
__tablename__ = 'rent'
......
......@@ -3,7 +3,7 @@
"""
@version:
author:Aeolus
@file: upload_portal.py
@file: file_protal.py
"""
import logging
import uuid
......
......@@ -14,6 +14,8 @@ from flask import Flask
from myapps.pc_management.api.admin_portal import admin_route
from myapps.pc_management.api.place_portal import place_route
from myapps.pc_management.api.machine_portal import machine_route
from myapps.pc_management.api.file_protal import file_route
from myapps.pc_management.api.production_portal import production_route
def register_sukang_blueprint(app: Flask):
......@@ -21,3 +23,5 @@ def register_sukang_blueprint(app: Flask):
app.register_blueprint(admin_route, url_prefix=prefix + "/admin")
app.register_blueprint(place_route, url_prefix=prefix + "/place")
app.register_blueprint(machine_route, url_prefix=prefix + "/machine")
app.register_blueprint(file_route, url_prefix=prefix + "/file")
app.register_blueprint(production_route, url_prefix=prefix + "/production")
......@@ -137,7 +137,7 @@ def add_user():
account.password = password
db.session.add(account)
db.session.commit()
account.user_no = "SK" + str(account.id).zfill(6)
account.user_no = "XX" + str(account.id).zfill(6)
db.session.add(account)
db.session.commit()
......
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: file_protal.py
"""
import logging
import os
import uuid
from flask import Blueprint, request, jsonify, send_from_directory
from config.env_path_config import img_file_path
from utils.my_response import BaseResponse
logger = logging.getLogger(__name__)
file_route = Blueprint('file', __name__)
ALLOWED_EXTENSIONS = ('pdf', 'png', 'jpg', 'jpeg', 'gif')
@file_route.route("/upload_img", methods=['POST'])
def run_upload_img():
file = request.files["file"]
filename = str(file.filename)
if len(filename) >= 20: # 限制文件名的长度必须在20个字符以内
return jsonify({"error_code": "500", "error_message": "文件名长度超出了限制!"})
else:
if filename.find(".") >= 0: # 查找文件中是否包含点这个字符
filetype = file.filename.split(".")[1] # 取出文件类型,后期做判断
if filetype and filetype in ALLOWED_EXTENSIONS: # 后缀格式必须是bmp结尾
uid = uuid.uuid4() # 生成随机名称
save_file_name = str(uid) + "." + filetype # 拼接名称
print(os.path.join(img_file_path, filename))
file.save(os.path.join(img_file_path, save_file_name)) # 保存文件
return BaseResponse(data={"filename": save_file_name})
return jsonify({"error_code": "500", "error_message": "没有选择文件,或不是图片格式,上传失败!"})
@file_route.route('/img/<filename>', )
def run_download_img(filename):
return send_from_directory(img_file_path, filename)
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2022/01/12
@file: machine_portal.py
@function:
@modify:
"""
import logging
from flask import Blueprint, g, request, jsonify
from config.commen_config import DISCOUNTS_TYPES
from models.base_model import db
from models.models import AdminMachine, Machine
from utils.error_code import MACHINE_NOT_EXIST_ERROR
from utils.my_response import BaseResponse
logger = logging.getLogger(__name__)
hatch_route = Blueprint('hatch', __name__)
@hatch_route.route("hatch_list", methods=["POST"])
def run_hatch_list():
"""
:return:
"""
json_data = request.get_json()
page = json_data.get("page", None)
page_size = json_data.get("pageSize", None)
keyword = json_data.get("keyword", None)
admin = g.user
select_sql = """select machine.id, machine.machine_no, machine.device_id, machine.qrcode_no,machine.status,
machine.mac, machine.power, machine.hatch_number, machine.type,machine.place_id,
place.place_name, machine.discounts_id
"""
count_sql = "select count(machine.id) as total_count"
from_sql = """ from machine left join place on machine.place_id = place.id
where machine.machine_no in ( select machine_no from admin_machine where
admin_machine.user_id = {user_id} and admin_machine.status = 1)
""".format(user_id=admin.id)
where_sql = " "
if keyword:
where_sql += """
and CONCAT(machine.machine_no,ifnull(machine.qrcode_no,'')) LIKE '%{keyword}%'
""".format(keyword=keyword)
order_sql = " ORDER BY machine.id ASC, machine.status ASC"
limit_sql = " LIMIT {offset} , {page_size} ".format(offset=(page - 1) * page_size, page_size=page_size)
count_result = db.session.execute(count_sql + from_sql + where_sql).fetchone()
if not count_result:
return BaseResponse(data={"list": [], "page": page, "pageSize": page_size, "total_count": 0})
else:
total_count = count_result.total_count
print(select_sql + from_sql + where_sql + order_sql + limit_sql)
result = db.session.execute(select_sql + from_sql + where_sql + order_sql + limit_sql).fetchall()
return_data = []
for info in result:
return_data.append(
{"machine_no": info.machine_no, "device_id": info.device_id, "place_name": info.place_name,
"mac": info.mac, "power": info.power, "hatch_number": info.hatch_number, "type": info.type,
"status": info.status, "place_id": info.place_id,
"discounts_id": info.discounts_id,
"discounts_name": DISCOUNTS_TYPES.get(int(info.discounts_id), "无此优惠"),
})
return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count})
@hatch_route.route("add_machine", methods=["POST"])
def run_add_machine():
"""
:return:
"""
json_data = request.get_json()
machine_no = json_data["machine_no"]
address = json_data.get("address", None)
device_id = json_data.get("device_id", None)
qrcode_no = json_data.get("qrcode_no", None)
mac = json_data.get("mac", None)
power = json_data.get("power", None)
hatch_number = json_data.get("hatch_number", None)
place_id = json_data.get("place_id", None)
type = json_data.get("type", 1)
discounts_id = json_data.get("type", 0)
machine_model = Machine()
machine_model.machine_no = machine_no
machine_model.device_id = device_id
machine_model.qrcode_no = qrcode_no
machine_model.mac = mac
machine_model.power = power
machine_model.place_id = place_id
machine_model.address = address
machine_model.mch_platform = 1
machine_model.hatch_number = hatch_number
machine_model.type = type
machine_model.discounts_id = discounts_id
db.session.add(machine_model)
admin_machine = AdminMachine()
admin_machine.user_id = g.user.id
admin_machine.user_no = g.user.user_no
admin_machine.machine_no = machine_model.machine_no
db.session.add(admin_machine)
db.session.commit()
return BaseResponse()
@hatch_route.route("edit_machine", methods=["POST"])
def run_edit_machine():
"""
:return:
"""
json_data = request.get_json()
machine_no = json_data["machine_no"]
address = json_data.get("address", None)
device_id = json_data.get("device_id", None)
qrcode_no = json_data.get("qrcode_no", None)
mac = json_data.get("mac", None)
power = json_data.get("power", None)
hatch_number = json_data.get("hatch_number", None)
place_id = json_data.get("place_id", None)
type = json_data.get("type", None)
status = json_data.get("status", None)
discounts_id = json_data.get("discounts_id", None)
machine_model = Machine.query.filter_by(machine_no=machine_no).first()
if not machine_model:
return jsonify(MACHINE_NOT_EXIST_ERROR)
if device_id:
machine_model.device_id = device_id
if qrcode_no:
machine_model.qrcode_no = qrcode_no
if mac:
machine_model.mac = mac
if power:
machine_model.power = power
if place_id:
machine_model.place_id = place_id
if address:
machine_model.address = address
if hatch_number:
machine_model.hatch_number = hatch_number
if type:
machine_model.type = type
if status:
machine_model.status = status
if discounts_id:
machine_model.discounts_id = discounts_id
db.session.add(machine_model)
db.session.commit()
return BaseResponse()
@hatch_route.route("machine_detail", methods=["POST"])
def get_machine_detail():
"""
:return:
"""
json_data = request.get_json()
machine_no = json_data["machine_no"]
admin = g.user
select_sql = """select machine.id, machine.machine_no, machine.device_id, machine.qrcode_no,machine.status,
machine.mac, machine.power, machine.hatch_number, machine.type,machine.place_id,
place.place_name, machine.discounts_id
"""
from_sql = """ from machine left join place on machine.place_id = place.id
where machine.machine_no in ( select machine_no from admin_machine where
admin_machine.user_id = {user_id} and admin_machine.status = 1)
""".format(user_id=admin.id)
where_sql = " and machine.machine_no = {}".format(machine_no)
result = db.session.execute(select_sql + from_sql + where_sql).fetchall()
if not result or len(result) != 1:
return jsonify(MACHINE_NOT_EXIST_ERROR)
info = result[0]
return BaseResponse(data={"machine_no": info.machine_no, "device_id": info.device_id, "place_name": info.place_name,
"mac": info.mac, "power": info.power, "hatch_number": info.hatch_number,
"type": info.type, "status": info.status, "place_id": info.place_id,
"discounts_id": info.discounts_id,
"discounts_name": DISCOUNTS_TYPES.get(int(info.discounts_id), "无此优惠"),
})
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2022/01/12
@file: production_portal.py
@function:
@modify:
"""
import logging
from flask import Blueprint, g, request, jsonify
from models.base_model import db
from models.models import AdminPlace, Place, Production, Brand, AdminBrand, ProductionType, AdminProductionType, \
AdminProduction
from utils.error_code import NO_PLACE_ERROR, NO_PRODUCTION_ERROR
from utils.my_response import BaseResponse
logger = logging.getLogger(__name__)
production_route = Blueprint('production', __name__)
@production_route.route("production_list", methods=["POST"])
def run_production_list():
"""
:return:
"""
json_data = request.get_json()
page = json_data.get("page", 1)
page_size = json_data.get("pageSize", 10)
keyword = json_data.get("keyword", None)
admin = g.user
select_sql = """select p.id, p.production_no, p.production_name, p.title, p.brand_id,p.production_type_id, p.price,p.original_price,
p.weight, p.weight_unit, p.expiration_date, p.expiration_date_unit, p.is_expiration_date,
p.weight_error, p.img, p.tags, p.content,p.summary, p.status,
brand.brand_name as brand_name, production_type.production_type_name as production_type_name
"""
count_sql = "select count(p.id) as total_count"
from_sql = """ from production p
left join brand on p.brand_id = brand.id
left join production_type on p.production_type_id = production_type.id
where p.id in ( select production_id from admin_production where
admin_production.user_id = {} and admin_production.status = 1
)
""".format(admin.id)
where_sql = " "
if keyword:
where_sql += """
and CONCAT(p.production_name, brand_name,production_type_name, p.production_no) LIKE '%{keyword}%'
""".format(keyword=keyword)
order_sql = " ORDER BY p.id ASC, p.status ASC"
limit_sql = " LIMIT {offset} , {page_size} ".format(offset=(page - 1) * page_size, page_size=page_size)
count_result = db.session.execute(count_sql + from_sql + where_sql).fetchone()
if not count_result:
return BaseResponse(data={"list": [], "page": page, "pageSize": page_size, "total_count": 0})
else:
total_count = count_result.total_count
result = db.session.execute(select_sql + from_sql + where_sql + order_sql + limit_sql).fetchall()
"""
select p.id, p.production_no, p.name, p.title, p.brand_id,p.production_type_id, p.price,p.original_price,
p.weight, p.weight_unit, p.expiration_date, p.expiration_date_unit, p.is_expiration_date,
p.weight_error, p.img, p.tags, p.content,p.summary, p.status,
brand.name as brand_name, production_type.name as production_type_name
"""
return_data = []
for info in result:
return_data.append(
{"production_name": info.production_name, "prodcution_id": info.id, "production_no": info.production_no,
"title": info.title, "brand_id": info.brand_id, "brand_name": info.brand_name,
"production_type_id": info.production_type_id, "production_type_name": info.production_type_name,
"price": info.price, "original_price": info.original_price, "weight": info.weight,
"weight_unit": info.weight_unit,
"expiration_date": info.expiration_date, "expiration_date_unit": info.expiration_date_unit,
"is_expiration_date": info.is_expiration_date,
"weight_error": info.weight_error,
"img": info.img, "tags": info.tags, "content": info.content,
"summary": info.summary, "status": info.status,
})
return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count})
@production_route.route("add_production", methods=["POST"])
def run_add_production():
"""
:return:
"""
json_data = request.get_json()
production_name = json_data["production_name"]
title = json_data.get("title", "")
brand_id = json_data.get("brand_id", 0)
production_type_id = json_data.get("production_type_id", 0)
price = json_data.get("price", 0)
original_price = json_data.get("original_price", 0)
weight = json_data.get("weight", 0)
weight_unit = json_data.get("weight_unit", "g")
weight_error = json_data.get("weight_error", 0)
expiration_date = json_data.get("expiration_date", 0)
expiration_date_unit = json_data.get("expiration_date_unit", "月")
is_expiration_date = json_data.get("is_expiration_date", 0)
img = json_data.get("img", None)
tags = json_data.get("tags", None)
content = json_data.get("content", None)
summary = json_data.get("summary", None)
production_model = Production()
production_model.production_no = "todo"
production_model.production_name = production_name
production_model.title = title
production_model.brand_id = brand_id
production_model.production_type_id = production_type_id
production_model.price = price
production_model.original_price = original_price
production_model.weight = weight
production_model.weight_unit = weight_unit
production_model.weight_error = weight_error
production_model.expiration_date = expiration_date
production_model.expiration_date_unit = expiration_date_unit
production_model.is_expiration_date = is_expiration_date
production_model.img = img
production_model.tags = tags
production_model.content = content
production_model.summary = summary
db.session.add(production_model)
db.session.commit()
admin_production = AdminProduction()
admin_production.user_id = g.user.id
admin_production.user_no = g.user.user_no
admin_production.production_id = production_model.id
db.session.add(admin_production)
production_model.production_no = "PD" + str(production_model.id).zfill(8)
db.session.add(production_model)
db.session.commit()
return BaseResponse()
@production_route.route("edit_production", methods=["POST"])
def run_edit_production():
"""
:return:
"""
json_data = request.get_json()
production_id = json_data["production_id"]
production_name = json_data.get("production_name", None)
title = json_data.get("title", None)
brand_id = json_data.get("brand_id", None)
production_type_id = json_data.get("production_type_id", None)
price = json_data.get("price", None)
original_price = json_data.get("original_price", None)
weight = json_data.get("weight", None)
weight_unit = json_data.get("weight_unit", None)
weight_error = json_data.get("weight_error", None)
expiration_date = json_data.get("expiration_date", None)
expiration_date_unit = json_data.get("expiration_date_unit", None)
is_expiration_date = json_data.get("is_expiration_date", None)
img = json_data.get("img", None)
tags = json_data.get("tags", None)
content = json_data.get("content", None)
summary = json_data.get("summary", None)
production_model = Production.query.filter_by(id=production_id).first()
if not production_model:
return jsonify(NO_PRODUCTION_ERROR)
if production_name:
production_model.production_name = production_name
if title:
production_model.title = title
if brand_id:
production_model.brand_id = brand_id
if production_type_id:
production_model.production_type_id = production_type_id
if price:
production_model.price = price
if original_price:
production_model.original_price = original_price
if weight:
production_model.weight = weight
if weight_unit:
production_model.weight_unit = weight_unit
if weight_error:
production_model.weight_error = weight_error
if expiration_date:
production_model.expiration_date = expiration_date
if expiration_date_unit:
production_model.expiration_date_unit = expiration_date_unit
if is_expiration_date:
production_model.is_expiration_date = is_expiration_date
if img:
production_model.img = img
if tags:
production_model.tags = tags
if content:
production_model.content = content
if summary:
production_model.summary = summary
db.session.add(production_model)
db.session.commit()
return BaseResponse()
@production_route.route("production_detail", methods=["POST"])
def get_production_detail():
"""
:return:
"""
json_data = request.get_json()
production_id = json_data["production_id"]
admin = g.user
select_sql = """select p.id, p.production_no, p.production_name, p.title, p.brand_id,p.production_type_id, p.price,p.original_price,
p.weight, p.weight_unit, p.expiration_date, p.expiration_date_unit, p.is_expiration_date,
p.weight_error, p.img, p.tags, p.content,p.summary, p.status,
brand.brand_name as brand_name, production_type.production_type_name as production_type_name
"""
from_sql = """ from production p
left join brand on p.brand_id = brand.id
left join production_type on p.production_type_id = production_type.id
where p.id in (
select production_id from admin_production where
admin_production.user_id = {} and admin_production.status = 1
)
""".format(admin.id)
where_sql = " and p.id={}".format(production_id)
result = db.session.execute(select_sql + from_sql + where_sql).fetchall()
if not result or len(result) != 1:
return jsonify(NO_PRODUCTION_ERROR)
info = result[0]
return BaseResponse(
data={"production_name": info.production_name, "prodcution_id": info.id, "production_no": info.production_no,
"title": info.title, "brand_id": info.brand_id, "brand_name": info.brand_name,
"production_type_id": info.production_type_id, "production_type_name": info.production_type_name,
"price": info.price, "original_price": info.original_price, "weight": info.weight,
"weight_unit": info.weight_unit,
"expiration_date": info.expiration_date, "expiration_date_unit": info.expiration_date_unit,
"is_expiration_date": info.is_expiration_date,
"weight_error": info.weight_error,
"img": info.img, "tags": info.tags, "content": info.content,
"summary": info.summary, "status": info.status,
})
@production_route.route("brand_list", methods=["POST"])
def run_brand_list():
"""
:return:
"""
json_data = request.get_json()
page = json_data.get("page", 1)
page_size = json_data.get("pageSize", 10)
keyword = json_data.get("keyword", None)
admin = g.user
select_sql = """select b.id, b.brand_name,b.status, b.img
"""
count_sql = "select count(b.id) as total_count"
from_sql = """ from brand b where b.id in (
select brand_id from admin_brand where
admin_brand.user_id = {} and admin_brand.status = 1
)
""".format(admin.id)
where_sql = ""
if keyword:
where_sql += """
and CONCAT( b.brand_name) LIKE '%{keyword}%'
""".format(keyword=keyword)
order_sql = " ORDER BY b.id ASC, b.status ASC"
limit_sql = " LIMIT {offset} , {page_size} ".format(offset=(page - 1) * page_size, page_size=page_size)
count_result = db.session.execute(count_sql + from_sql + where_sql).fetchone()
if not count_result:
return BaseResponse(data={"list": [], "page": page, "pageSize": page_size, "total_count": 0})
else:
total_count = count_result.total_count
result = db.session.execute(select_sql + from_sql + where_sql + order_sql + limit_sql).fetchall()
return_data = []
for info in result:
return_data.append({"brand_id": info.id, "brand_name": info.brand_name, "status": info.status,
"img": info.img})
return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count})
@production_route.route("add_brand", methods=["POST"])
def run_add_brand():
"""
:return:
"""
json_data = request.get_json()
brand_name = json_data["brand_name"]
img = json_data.get("img", None)
brand_model = Brand()
brand_model.brand_name = brand_name
brand_model.img = img
db.session.add(brand_model)
db.session.commit()
db.session.commit()
admin_brand = AdminBrand()
admin_brand.user_id = g.user.id
admin_brand.user_no = g.user.user_no
admin_brand.brand_id = brand_model.id
db.session.add(admin_brand)
db.session.commit()
return BaseResponse()
@production_route.route("edit_brand", methods=["POST"])
def run_edit_brand():
"""
:return:
"""
json_data = request.get_json()
brand_id = json_data["brand_id"]
brand_name = json_data.get("brand_name", None)
img = json_data.get("img", None)
status = json_data.get("status", None)
brand_model = Brand.query.filter_by(id=brand_id).first()
if not brand_model:
return jsonify(NO_PRODUCTION_ERROR)
if brand_name:
brand_model.brand_name = brand_name
if img:
brand_model.img = img
if status:
brand_model.status = status
db.session.add(brand_model)
db.session.commit()
return BaseResponse()
@production_route.route("brand_detail", methods=["POST"])
def get_brand_detail():
"""
:return:
"""
json_data = request.get_json()
brand_id = json_data["brand_id"]
admin = g.user
select_sql = """select b.id,b.brand_name,b.status
"""
from_sql = """ from brand b """
where_sql = " where b.id ={}".format(brand_id)
result = db.session.execute(select_sql + from_sql + where_sql).fetchall()
if not result or len(result) != 1:
return jsonify(NO_PRODUCTION_ERROR)
info = result[0]
return BaseResponse(
data={"brand_id": info.id, "brand_name": info.brand_name,
"status": info.status,
})
@production_route.route("production_type_list", methods=["POST"])
def run_production_type_list():
"""
:return:
"""
json_data = request.get_json()
page = json_data.get("page", 1)
page_size = json_data.get("pageSize", 10)
keyword = json_data.get("keyword", None)
admin = g.user
select_sql = """select p.id, p.production_type_name,p.status
"""
count_sql = "select count(p.id) as total_count"
from_sql = """ from production_type p where p.id in (
select production_type_id from admin_production_type where
admin_production_type.user_id = {} and admin_production_type.status = 1
)
""".format(admin.id)
where_sql = " "
if keyword:
where_sql += """
and CONCAT( p.production_type_name) LIKE '%{keyword}%'
""".format(keyword=keyword)
order_sql = " ORDER BY p.id ASC, p.status ASC"
limit_sql = " LIMIT {offset} , {page_size} ".format(offset=(page - 1) * page_size, page_size=page_size)
count_result = db.session.execute(count_sql + from_sql + where_sql).fetchone()
if not count_result:
return BaseResponse(data={"list": [], "page": page, "pageSize": page_size, "total_count": 0})
else:
total_count = count_result.total_count
result = db.session.execute(select_sql + from_sql + where_sql + order_sql + limit_sql).fetchall()
return_data = []
for info in result:
return_data.append(
{"production_type_id": info.id, "production_type_name": info.production_type_name, "status": info.status})
return BaseResponse({"list": return_data, "page": page, "pageSize": page_size, "total_count": total_count})
@production_route.route("add_production_type", methods=["POST"])
def run_add_production_type():
"""
:return:
"""
json_data = request.get_json()
production_type_name = json_data["production_type_name"]
img = json_data.get("img", None)
production_type_model = ProductionType()
production_type_model.production_type_name = production_type_name
production_type_model.img = img
db.session.add(production_type_model)
db.session.commit()
db.session.commit()
admin_production_type = AdminProductionType()
admin_production_type.user_id = g.user.id
admin_production_type.user_no = g.user.user_no
admin_production_type.production_type_id = production_type_model.id
db.session.add(admin_production_type)
db.session.commit()
return BaseResponse()
@production_route.route("edit_production_type", methods=["POST"])
def run_edit_production_type():
"""
:return:
"""
json_data = request.get_json()
production_type_id = json_data["production_type_id"]
production_type_name = json_data.get("production_type_name", None)
status = json_data.get("status", None)
production_type_model = ProductionType.query.filter_by(id=production_type_id).first()
if not production_type_model:
return jsonify(NO_PRODUCTION_ERROR)
if production_type_name:
production_type_model.production_type_name = production_type_name
if status:
production_type_model.status = status
db.session.add(production_type_model)
db.session.commit()
return BaseResponse()
@production_route.route("production_type_detail", methods=["POST"])
def get_production_type_detail():
"""
:return:
"""
json_data = request.get_json()
production_type_id = json_data["production_type_id"]
admin = g.user
select_sql = """select b.id,b.production_type_name,b.status
"""
from_sql = """ from production_type b """
where_sql = " where b.id ={}".format(production_type_id)
result = db.session.execute(select_sql + from_sql + where_sql).fetchall()
if not result or len(result) != 1:
return jsonify(NO_PRODUCTION_ERROR)
info = result[0]
return BaseResponse(
data={"production_type_id": info.id, "production_type_name": info.production_type_name,
"status": info.status,
})
[uwsgi]
# uwsgi 启动时所使用的地址与端口
socket = 127.0.0.1:8894
# 指向网站目录
chdir = /data/www/automat
# python 启动程序文件
wsgi-file = pc_management_app.py
# python 程序内用以启动的 application 变量名
callable = app
# 处理器数
processes = 2
# 线程数
threads = 4
#!usr/bin/env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @file: error_code.py """ ### 通用错误相关 Param_Invalid_Error = { "error_code": "500", "error_message": "params is invalid, 参数无效" } TOKEN_NOT_VALID_ERROR = { "error_code": "1001", "error_message": "无效的token" } TOKEN_NOT_PROVIDED_ERROR = { "error_code": "1002", "error_message": "token未提供" } TOKEN_EXPIRE_ERROR = { "error_code": "1003", "error_message": "token超时" } PHONE_NOT_BINDING_ERROR = { "error_code": "1004", "error_message": "未绑定手机号" } PHONE_NOT_NULL_ERROR = { "error_code": "1005", "error_message": "手机号为空" } PHONE_NOT_VALID_ERROR = { "error_code": "1006", "error_message": "无效的手机号" } USER_ALREADY_REGISTER_ERROR = { "error_code": "1007", "error_message": "用户已注册" } VERIFICATION_CODE_NULL_ERROR = { "error_code": "1008", "error_message": "验证码为空" } VERIFICATION_CODE_INVALID_ERROR = { "error_code": "1009", "error_message": "验证码已失效" } VERIFICATION_CODE_ERROR = { "error_code": "1010", "error_message": "验证码错误" } PASSWORD_ERROR = { "error_code": "1011", "error_message": "账号或密码错误" } # 账号相关 12开头 ACCOUNT_ALREADY_EXISTS_ERROR = { "error_code": '1012', "error_message": "该账号已存在" } ACCOUNT_NOT_EXISTS_ERROR = { "error_code": '1013', "error_message": "账号不存在" } ACCOUNT_ALREADY_DELETE_ERROR = { "error_code": '1014', "error_message": "账号已被删除" } ACCOUNT_AGENT_SPOT_NULL_ERROR = { "error_code": '1015', "error_message": "代理商景点列表为空" } AGNET_MODULES_ERROR = { "error_code": '1016', "error_message": "用户未绑定模块" } OPERATE_TYPE_ERROR = { "error_code": '1017', "error_message": "type错误" } OPERATE_LEVEL_ERROR = { "error_code": '1018', "error_message": "权限错误" } OPERATE_ERROR = { "error_code": '1019', "error_message": "操作有误" } MODULES_NOT_EXISTS_ERROR = { "error_code": '1020', "error_message": "modules not exists,模块不存在" } ACCOUNT_AGENT_SPOT_NOT_EXIST = { "error_code": '1021', "error_message": "agent spot not exists,代理景区不存在" } AGENT_MACHINE_NOT_EXIST = { "error_code": '1022', "error_message": "agent machine not exists,代理机柜不存在" } ## 微信登陆相关 WX_LOGIN_DATA_ERROR = { "error_code": "3001", "error_message": "微信登录数据错误" } WX_LOGIN_CODE_ERROR = { "error_code": "3002", "error_message": "微信登录code值错误" } WX_OPENID_NOT_GET_ERROR = { "error_code": "3003", "error_message": "微信OpenId获取失败,请刷新重试" } WX_SESSION_KEY_ERROR = { "error_code": "3004", "error_message": "session key error" } ### 微信支付相关 WE_MINIAPP_PAY_FAIL = { "error_code": "3101", "error_message": "小程序下单失败" } ### 消息推送相关 WXBizMsgCrypt_OK = { "error_code": "0", "error_message": "WXBizMsgCrypt_OK" } WXBizMsgCrypt_ValidateSignature_Error = { "error_code": "4001", "error_message": "验证签名错误" } WXBizMsgCrypt_ParseXml_Error = { "error_code": "4002", "error_message": "解析xml错误" } WXBizMsgCrypt_ComputeSignature_Error = { "error_code": "4003", "error_message": "计算签名错误" } WXBizMsgCrypt_IllegalAesKey = { "error_code": "4004", "error_message": "Aes key非法错误" } WXBizMsgCrypt_ValidateAppid_Error = { "error_code": "4005", "error_message": "appid错误" } WXBizMsgCrypt_EncryptAES_Error = { "error_code": "4006", "error_message": "aes加密错误" } WXBizMsgCrypt_DecryptAES_Error = { "error_code": "4007", "error_message": "aes解密错误" } WXBizMsgCrypt_IllegalBuffer = { "error_code": "4008", "error_message": "illegal buffer" } WXBizMsgCrypt_EncodeBase64_Error = { "error_code": "4009", "error_message": "base64加密错误" } WXBizMsgCrypt_DecodeBase64_Error = { "error_code": "4010", "error_message": "base64解密错误" } WXBizMsgCrypt_GenReturnXml_Error = { "error_code": "4011", "error_message": "gen return xml error" } MACHINE_NOT_EXIST_ERROR = { "error_code": '5001', "error_message": "机柜不存在" } MACHINE_IS_USE_ERROR = { "error_code": '5002', "error_message": "已有他人正在租借中,请稍后" } MACHINE_IS_NOT_ONLINE_ERROR = { "error_code": '5003', "error_message": "机柜不在线" } MACHINE_ADD_ERROR = { "error_code": '5004', "error_message": "机柜添加失败" } MACHINE_NO_DUPLICATE_ERROR = { "error_code": '5005', "error_message": "machine_no duplicate,机柜编号重复" } MACHINE_EDIT_ERROR = { "error_code": '5006', "error_message": "machine edit error, 机柜修改错误" } HATCH_NOT_EXIST_ERROR = { "error_code": "5007", "error_message": "no hatch, 没有商品信息" } HATCH_NOT_ALL_EXIST_ERROR = { "error_code": "5008", "error_message": "no all hatch, 存在已售出商品" } HATCH_COUNT_ERROR = { "error_code": "5009", "error_message": "hatch count error, 商品数量错误,检查数量" } MACHINE_ACTIVATED_ERROR = { "error_code": '5010', "error_message": "machine activated, 机柜已激活" } MACHINE_NO_CREATE_ERROR = { "error_code": '5011', "error_message": "machine_no create error , 机柜编号生成错误" } MACHINE_MAC_DUPLICATE_ERROR = { "error_code": '5012', "error_message": "machine_mac duplicate,机柜mac重复" } ### 订单相关 RENT_ORDER_NOT_BACK_ERROR = { "error_code": '6101', "error_message": "有未归还的订单" } RENT_ORDER_NOT_TAKE_ERROR = { "error_code": '6102', "error_message": "有未取货的订单" } RENT_ORDER_NUMBER_MAX = { "error_code": '6103', "error_message": "订单数量达到上限" } TAKE_CODE_NOT_VALID = { "error_code": '6104', "error_message": "取货码错误请确认手机号及取货码是否匹配" } CODE_CANCEL_ERROR = { "error_code": '6105', "error_message": "取货码已取消" } CODE_USED_ERROR = { "error_code": '6108', "error_message": "取货码已使用" } NO_POWER_ERROR = { "error_code": '6106', "error_message": "没有可租借设备" } NO_RENT_RECORD = { "error_code": '6107', "error_message": "订单不存在" } CODE_USED_ERROR = { "error_code": '6108', "error_message": "取货码已使用" } RENT_ORDER_NUMBER_LIMIT = { "error_code": '6109', "error_message": "机柜只允许租借一台" } REFUND_NOT_RENT_INFO = { "error_code": "6301", "error_message": "没有该订单信息" } REFUND_BACK_TIME_ERROR = { "error_code": "6302", "error_message": "归还时间异常" } REFUND_NOT_PRODUCTION_INFO = { "error_code": "6303", "error_message": "没有该讲解器信息" } REFUND_MONEY_IS_ZERO = { "error_code": "6304", "error_message": "退款金额为零" } REFUND_NO_DUPLICATE = { "error_code": "6305", "error_message": "退款单号重复" } TALLYMAN_ACCOUNT_EXIST = { "error_code": "7001", "error_message": "tallyman account exist, 补货员账号已存在" } TALLYMAN_ACCOUNT_NOT_EXIST = { "error_code": "7002", "error_message": "tallyman account not exist, 补货员账号不存在" } NFC_CARD_NOT_EXIST = { "error_code": "8001", "error_message": "nfc card not exist, 卡号错误" } NFC_CARD_ACTIVATED_ERROR = { "error_code": "8002", "error_message": "nfc card activated, 卡片已激活" } NO_NFC_CARD_ERROR = { "error_code": "8003", "error_message": "no nfc card , 不存在卡片" } RE_NFC_CARD_ERROR = { "error_code": "8004", "error_message": "re nfc card , 卡片已存在" } NFC_PAY_LOAD_SECRET_ERROR = { "error_code": "8005", "error_message": "secret error , 身份验证失败" } NO_PLACE_ERROR = { "error_code": "9001", "error_message": "no place error,不存在场景" } # COMMON_MONGO_ERROR = { "error_code": 5001, "error_message": "mongodb operation error, mongodb操作错误" }
\ No newline at end of file
#!usr/bin/env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @file: error_code.py """ ### 通用错误相关 Param_Invalid_Error = { "error_code": "500", "error_message": "params is invalid, 参数无效" } TOKEN_NOT_VALID_ERROR = { "error_code": "1001", "error_message": "无效的token" } TOKEN_NOT_PROVIDED_ERROR = { "error_code": "1002", "error_message": "token未提供" } TOKEN_EXPIRE_ERROR = { "error_code": "1003", "error_message": "token超时" } PHONE_NOT_BINDING_ERROR = { "error_code": "1004", "error_message": "未绑定手机号" } PHONE_NOT_NULL_ERROR = { "error_code": "1005", "error_message": "手机号为空" } PHONE_NOT_VALID_ERROR = { "error_code": "1006", "error_message": "无效的手机号" } USER_ALREADY_REGISTER_ERROR = { "error_code": "1007", "error_message": "用户已注册" } VERIFICATION_CODE_NULL_ERROR = { "error_code": "1008", "error_message": "验证码为空" } VERIFICATION_CODE_INVALID_ERROR = { "error_code": "1009", "error_message": "验证码已失效" } VERIFICATION_CODE_ERROR = { "error_code": "1010", "error_message": "验证码错误" } PASSWORD_ERROR = { "error_code": "1011", "error_message": "账号或密码错误" } # 账号相关 12开头 ACCOUNT_ALREADY_EXISTS_ERROR = { "error_code": '1012', "error_message": "该账号已存在" } ACCOUNT_NOT_EXISTS_ERROR = { "error_code": '1013', "error_message": "账号不存在" } ACCOUNT_ALREADY_DELETE_ERROR = { "error_code": '1014', "error_message": "账号已被删除" } ACCOUNT_AGENT_SPOT_NULL_ERROR = { "error_code": '1015', "error_message": "代理商景点列表为空" } AGNET_MODULES_ERROR = { "error_code": '1016', "error_message": "用户未绑定模块" } OPERATE_TYPE_ERROR = { "error_code": '1017', "error_message": "type错误" } OPERATE_LEVEL_ERROR = { "error_code": '1018', "error_message": "权限错误" } OPERATE_ERROR = { "error_code": '1019', "error_message": "操作有误" } MODULES_NOT_EXISTS_ERROR = { "error_code": '1020', "error_message": "modules not exists,模块不存在" } ACCOUNT_AGENT_SPOT_NOT_EXIST = { "error_code": '1021', "error_message": "agent spot not exists,代理景区不存在" } AGENT_MACHINE_NOT_EXIST = { "error_code": '1022', "error_message": "agent machine not exists,代理机柜不存在" } ## 微信登陆相关 WX_LOGIN_DATA_ERROR = { "error_code": "3001", "error_message": "微信登录数据错误" } WX_LOGIN_CODE_ERROR = { "error_code": "3002", "error_message": "微信登录code值错误" } WX_OPENID_NOT_GET_ERROR = { "error_code": "3003", "error_message": "微信OpenId获取失败,请刷新重试" } WX_SESSION_KEY_ERROR = { "error_code": "3004", "error_message": "session key error" } ### 微信支付相关 WE_MINIAPP_PAY_FAIL = { "error_code": "3101", "error_message": "小程序下单失败" } ### 消息推送相关 WXBizMsgCrypt_OK = { "error_code": "0", "error_message": "WXBizMsgCrypt_OK" } WXBizMsgCrypt_ValidateSignature_Error = { "error_code": "4001", "error_message": "验证签名错误" } WXBizMsgCrypt_ParseXml_Error = { "error_code": "4002", "error_message": "解析xml错误" } WXBizMsgCrypt_ComputeSignature_Error = { "error_code": "4003", "error_message": "计算签名错误" } WXBizMsgCrypt_IllegalAesKey = { "error_code": "4004", "error_message": "Aes key非法错误" } WXBizMsgCrypt_ValidateAppid_Error = { "error_code": "4005", "error_message": "appid错误" } WXBizMsgCrypt_EncryptAES_Error = { "error_code": "4006", "error_message": "aes加密错误" } WXBizMsgCrypt_DecryptAES_Error = { "error_code": "4007", "error_message": "aes解密错误" } WXBizMsgCrypt_IllegalBuffer = { "error_code": "4008", "error_message": "illegal buffer" } WXBizMsgCrypt_EncodeBase64_Error = { "error_code": "4009", "error_message": "base64加密错误" } WXBizMsgCrypt_DecodeBase64_Error = { "error_code": "4010", "error_message": "base64解密错误" } WXBizMsgCrypt_GenReturnXml_Error = { "error_code": "4011", "error_message": "gen return xml error" } MACHINE_NOT_EXIST_ERROR = { "error_code": '5001', "error_message": "机柜不存在" } MACHINE_IS_USE_ERROR = { "error_code": '5002', "error_message": "已有他人正在租借中,请稍后" } MACHINE_IS_NOT_ONLINE_ERROR = { "error_code": '5003', "error_message": "机柜不在线" } MACHINE_ADD_ERROR = { "error_code": '5004', "error_message": "机柜添加失败" } MACHINE_NO_DUPLICATE_ERROR = { "error_code": '5005', "error_message": "machine_no duplicate,机柜编号重复" } MACHINE_EDIT_ERROR = { "error_code": '5006', "error_message": "machine edit error, 机柜修改错误" } HATCH_NOT_EXIST_ERROR = { "error_code": "5007", "error_message": "no hatch, 没有商品信息" } HATCH_NOT_ALL_EXIST_ERROR = { "error_code": "5008", "error_message": "no all hatch, 存在已售出商品" } HATCH_COUNT_ERROR = { "error_code": "5009", "error_message": "hatch count error, 商品数量错误,检查数量" } MACHINE_ACTIVATED_ERROR = { "error_code": '5010', "error_message": "machine activated, 机柜已激活" } MACHINE_NO_CREATE_ERROR = { "error_code": '5011', "error_message": "machine_no create error , 机柜编号生成错误" } MACHINE_MAC_DUPLICATE_ERROR = { "error_code": '5012', "error_message": "machine_mac duplicate,机柜mac重复" } ### 订单相关 RENT_ORDER_NOT_BACK_ERROR = { "error_code": '6101', "error_message": "有未归还的订单" } RENT_ORDER_NOT_TAKE_ERROR = { "error_code": '6102', "error_message": "有未取货的订单" } RENT_ORDER_NUMBER_MAX = { "error_code": '6103', "error_message": "订单数量达到上限" } TAKE_CODE_NOT_VALID = { "error_code": '6104', "error_message": "取货码错误请确认手机号及取货码是否匹配" } CODE_CANCEL_ERROR = { "error_code": '6105', "error_message": "取货码已取消" } CODE_USED_ERROR = { "error_code": '6108', "error_message": "取货码已使用" } NO_POWER_ERROR = { "error_code": '6106', "error_message": "没有可租借设备" } NO_RENT_RECORD = { "error_code": '6107', "error_message": "订单不存在" } CODE_USED_ERROR = { "error_code": '6108', "error_message": "取货码已使用" } RENT_ORDER_NUMBER_LIMIT = { "error_code": '6109', "error_message": "机柜只允许租借一台" } REFUND_NOT_RENT_INFO = { "error_code": "6301", "error_message": "没有该订单信息" } REFUND_BACK_TIME_ERROR = { "error_code": "6302", "error_message": "归还时间异常" } REFUND_NOT_PRODUCTION_INFO = { "error_code": "6303", "error_message": "没有该讲解器信息" } REFUND_MONEY_IS_ZERO = { "error_code": "6304", "error_message": "退款金额为零" } REFUND_NO_DUPLICATE = { "error_code": "6305", "error_message": "退款单号重复" } TALLYMAN_ACCOUNT_EXIST = { "error_code": "7001", "error_message": "tallyman account exist, 补货员账号已存在" } TALLYMAN_ACCOUNT_NOT_EXIST = { "error_code": "7002", "error_message": "tallyman account not exist, 补货员账号不存在" } NFC_CARD_NOT_EXIST = { "error_code": "8001", "error_message": "nfc card not exist, 卡号错误" } NFC_CARD_ACTIVATED_ERROR = { "error_code": "8002", "error_message": "nfc card activated, 卡片已激活" } NO_NFC_CARD_ERROR = { "error_code": "8003", "error_message": "no nfc card , 不存在卡片" } RE_NFC_CARD_ERROR = { "error_code": "8004", "error_message": "re nfc card , 卡片已存在" } NFC_PAY_LOAD_SECRET_ERROR = { "error_code": "8005", "error_message": "secret error , 身份验证失败" } # 9 场所相关 NO_PLACE_ERROR = { "error_code": "9001", "error_message": "no place error,不存在场景" } # 10 商品相关 NO_PRODUCTION_ERROR = { "error_code": "10001", "error_message": "no production error,商品不存在" } NO_BRAND_ERROR = { "error_code": "10002", "error_message": "no production error,品牌不存在" } # COMMON_MONGO_ERROR = { "error_code": 5001, "error_message": "mongodb operation error, mongodb操作错误" }
\ No newline at end of file
......
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "automat": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('machine.run_create_machine_no'), url_for('machine.run_bind_serial_num'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login"), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount, AdminAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ path_list = request.path.split("/") if current_app.name == "automat": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), url_for('machine.run_get_machine_no'), url_for('machine.run_create_machine_no'), url_for('machine.run_bind_serial_num'), url_for('nfc_card.run_nfc_card_wx_pay_callback'), url_for('nfc_card.run_nfc_card_user_pay_record'), url_for('nfc_card.run_nfc_card_load_succeed'), url_for('nfc_card.run_nfc_card_user_load_record'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) elif current_app.name == "pc_management": NO_AUTH_CHECK_URL = [url_for("admin.user_login") ] if request.path.split("/")[2] == "file" and request.path.split("/")[3] == "img": # 图片接口不验证token return if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id', None) if user_id: g.user = AdminAccount.query.filter_by(id=user_id).first() if g.user: return return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] return def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment