Commit 04107188 by Aeolus

update

parent e457171c
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
__pycache__/
.idea/
*.pyc
.gz
.txt
*.zip
.log*
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Cert
*.pem
*.pfx
*.key
*.crt
*.cer
*.truststore
# IDE
*.DS_Store
# Media
*.jpg
\ No newline at end of file
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: app.py.py
@function:
@modify:
"""
import os
import logging
from app import create_app
logger = logging.getLogger(__name__)
app = create_app('production')
# app = create_app(os.getenv('RUN_ENV', 'development'))
app.run('127.0.0.1', 8888)
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: __init__.py.py
@function:
@modify:
"""
import logging
from flask import Flask
from flask_log_request_id import RequestID
from flask_sqlalchemy import SQLAlchemy
from config import config
from mylogger import MyLogger
from utils import jwt_authentication
db = SQLAlchemy()
def create_app(config_name):
print(config_name)
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
RequestID(app)
MyLogger()
# app.before_request(jwt_authentication)
# todo register blueprint
from app.api import api
from app.api.repair import repair
app.register_blueprint(api, url_prefix='/api')
app.register_blueprint(repair, url_prefix='/api/repair')
return app
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: __init__.py.py
@function:
@modify:
"""
import logging
from flask import Blueprint, url_for
logger = logging.getLogger(__name__)
api = Blueprint('api', __name__)
@api.route('test', methods=['GET', 'POST'])
def get_test():
logger.info('test')
return 'test'
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: repair.py
@function:
@modify:
"""
import logging
from flask import Blueprint, request
from app import db
from models.Repairs import PowerRepairModel
from utils.return_code import BASE_RESPONSE, PARAMS_INVALID
logger = logging.getLogger(__name__)
repair = Blueprint('repair', __name__)
@repair.route("/update", methods=["POST"])
def update_repair():
nfc_id = request.values.get("nfc_id", None)
power_sn = request.values.get("sn", None)
repair_content = request.values.get("repair_content", None)
repair_man = request.values.get("repair_man", None)
index = request.values.get("index", None)
if not nfc_id or not power_sn or not repair_content or not repair_man:
return BASE_RESPONSE(**PARAMS_INVALID)
logger.info('=======================')
model = PowerRepairModel()
model.power_sn = power_sn
model.nfc_id = nfc_id
model.repair_content = repair_content
model.repair_mane = repair_man
model.index = index
db.session.add(model)
result = db.session.commit()
return BASE_RESPONSE()
@repair.route("/list", methods=["POST"])
def list_repair():
power_sn = request.values.get("sn", None)
if not power_sn:
return BASE_RESPONSE(**PARAMS_INVALID)
logger.info('=======================')
return_data = []
result = PowerRepairModel.query.filter_by(power_sn=power_sn).order_by(PowerRepairModel.index.asc()).all()
if result:
for i in result:
return_data.append({"sn": i.power_sn,
"nfc_id": i.nfc_id,
"repair_content": i.repair_content,
"repair_man": i.repair_man,
"index": i.index,
})
else:
return_data = []
return BASE_RESPONSE(data=return_data)
#!usr/bin/env python
#-*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: __init__.py.py
@function:
@modify:
"""
\ No newline at end of file
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: app.py.py
@function:
@modify:
"""
import os
from flask import copy_current_request_context
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY');
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_POOL_SIZE = 10
SQLALCHEMY_POOL_RECYCLE = 1800
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:N58uTiMT#tt*@sh-cdb-9yr5zhu0.sql.tencentcdb.com:61088/suishenwan'
class ProductionConfig(Config):
print("here is ProductionConfig")
DEBUG = False
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:Boosal2014!!@sh-cdb-kzmpyjqw.sql.tencentcdb.com:63037/suishenwan'
@classmethod
def init_app(cls, app):
Config.init_app(app)
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: Base.py
@function:
@modify:
"""
from sqlalchemy import Column, func
from app import db
class BaseModel(db.Model):
__abstract__ = True
created_at = Column(db.TIMESTAMP, nullable=False, server_default=func.now())
updated_at = Column(db.TIMESTAMP, nullable=False, server_default=func.now(), onupdate=func.now())
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: Repairs.py
@function:
@modify:
"""
from sqlalchemy import Column, String, Integer
from models.Base import BaseModel
class PowerRepairModel(BaseModel):
__tablename__ = 'power_repair'
id = Column(Integer, primary_key=True)
power_sn = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='sn')
nfc_id = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='nfc_id')
repair_content = Column(String(40, 'utf8mb4_unicode_ci'), nullable=False, comment='维修内容')
repair_man = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='维修人员')
index = Column(Integer, nullable=False, comment='维修次数')
status = Column(Integer, nullable=False, default=1, comment='1正常 -1删除')
#!usr/bin/env python
#-*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: __init__.py.py
@function:
@modify:
"""
\ No newline at end of file
import logging
from flask_log_request_id import RequestIDLogFilter
class MyLogger(object):
def init_app(self, app):
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(filename)s - %(funcName)s -[line:%(lineno)d] - %(levelname)s - request_id=%(request_id)s: %(message)s",
"%Y-%m-%d %H:%M:%S"))
console_handler.addFilter(RequestIDLogFilter())
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(console_handler)
# "添加请求钩子"
from utils.middlewares import jwt_authentication
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/30
@file: jwt_util.py
@function:
@modify:
"""
import jwt
from flask import current_app
def generate_jwt(payload, expiry, secret=None):
"""
生成jwt
:param payload: dict 载荷
:param expiry: datetime 有效期
:param secret: 密钥
:return: jwt
"""
_payload = {'exp': expiry}
_payload.update(payload)
if not secret:
secret = current_app.config['JWT_SECRET']
token = jwt.encode(_payload, secret, algorithm='HS256')
return token.decode()
def verify_jwt(token, secret=None):
"""
检验jwt
:param token: jwt
:param secret: 密钥
:return: dict: payload
"""
if not secret:
secret = current_app.config['JWT_SECRET']
try:
payload = jwt.decode(token, secret, algorithm=['HS256'])
except jwt.PyJWTError:
payload = None
return payload
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: middlewares.py
@function:
@modify:
"""
from flask import g, request
from . import jwt_util
"""用户认证机制==>每次请求前获取并校验token"""
"@app.before_request 不使@调用装饰器 在 init文件直接装饰"
def jwt_authentication():
"""
1.获取请求头Authorization中的token
2.判断是否以 Bearer开头
3.使用jwt模块进行校验
4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存
"""
auth = request.headers.get('Authorization')
if auth and auth.startswith('Bearer '):
"提取token 0-6 被Bearer和空格占用 取下标7以后的所有字符"
token = auth[7:]
"校验token"
payload = jwt_util.verify_jwt(token)
"判断token的校验结果"
if payload:
"获取载荷中的信息赋值给g对象"
g.user_id = payload.get('user_id')
g.refresh = payload.get('refresh')
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: return_code.py
@function:
@modify:
"""
from flask import Response
from flask.json import dumps
class BASE_RESPONSE(Response):
def __init__(self, data=None, error_code=0, error_message='Success'):
if data:
result = dumps(dict(data=data, error_code=error_code, error_message=error_message))
else:
result = dumps(dict(error_code=error_code, error_message=error_message))
Response.__init__(self, result, mimetype='application/json')
PARAMS_INVALID = {
"error_code": 1001,
"error_message": "params invalid"
}
[uwsgi]
# uwsgi 启动时所使用的地址与端口
socket = 127.0.0.1:5051
# 指向网站目录
chdir = /data/www/bestour
# python 启动程序文件
wsgi-file = app.py
# python 程序内用以启动的 application 变量名
callable = app
# 处理器数
processes = 1
# 线程数
threads = 2
daemonize = /data/www/bestour/server.log
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment