Commit 4bf50099 by Aeolus

接口开发

parent 129b3e6a
...@@ -40,3 +40,21 @@ LOGIN_TYPE = { ...@@ -40,3 +40,21 @@ LOGIN_TYPE = {
'send_code': 3, 'send_code': 3,
'password': 4 'password': 4
} }
AGENT_STATUS = {
'1': '超级管理员',
'2': '出库员',
'3': '渠道经理',
'4': '财务',
'5': '运维管理员',
'6': '推销员',
'7': '介绍人',
'8': '合伙人',
'9': '补货员',
'10': '场所',
}
ACCOUNT_STATUS = {
'on_use': 1,
'delete': 2
}
...@@ -6,6 +6,74 @@ from werkzeug.security import generate_password_hash, check_password_hash ...@@ -6,6 +6,74 @@ from werkzeug.security import generate_password_hash, check_password_hash
from models.base_model import Base from models.base_model import Base
class AdminAccount(Base):
__tablename__ = 'admin_account'
id = Column(INTEGER(10), primary_key=True, unique=True)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
user_name = Column(String(255, 'utf8mb4_unicode_ci'), nullable=False)
phone = Column(String(191, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
level = Column(INTEGER(2), nullable=False)
parent_id = Column(INTEGER(10), nullable=False)
draw = Column(INTEGER(1), nullable=False)
rate = Column(INTEGER(1), nullable=False)
status = Column(INTEGER(1), nullable=False)
_password_hash_ = Column(String(255, 'utf8mb4_unicode_ci'))
comment = Column(String(255, 'utf8mb4_unicode_ci'))
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
@property
def password(self):
raise Exception('密码不能被读取') # 为了保持使用习惯,还是设置一个password字段用来设置密码,当然也不能被读取。
# 赋值password,则自动加密存储。
@password.setter
def password(self, value):
self._password_hash_ = generate_password_hash(value)
# 使用check_password,进行密码校验,返回True False。
def check_password(self, pasword):
return check_password_hash(self._password_hash_, pasword)
class AdminLoginRecord(Base):
__tablename__ = 'admin_login_record'
id = Column(INTEGER(10), primary_key=True)
phone = Column(VARCHAR(40), nullable=False)
platform = Column(TINYINT(4), nullable=False, server_default=text("'1'"))
ip = Column(VARCHAR(40), nullable=False)
last_login = Column(DateTime, nullable=False)
login_type = Column(INTEGER(1), nullable=False)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
class AdminMachine(Base):
__tablename__ = 'admin_machine'
id = Column(INTEGER(11), primary_key=True)
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
machine_no = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False)
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class AdminPlace(Base):
__tablename__ = 'admin_place'
id = Column(INTEGER(11), primary_key=True)
user_id = Column(INTEGER(11), nullable=False)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False)
place_id = Column(INTEGER(11), nullable=False)
status = Column(INTEGER(1), nullable=False)
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Brand(Base): class Brand(Base):
__tablename__ = 'brand' __tablename__ = 'brand'
...@@ -356,6 +424,7 @@ class WxUser(Base): ...@@ -356,6 +424,7 @@ class WxUser(Base):
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP")) created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")) updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Management(Base): class Management(Base):
__tablename__ = 'management_login' __tablename__ = 'management_login'
...@@ -384,6 +453,7 @@ class Management(Base): ...@@ -384,6 +453,7 @@ class Management(Base):
def check_password(self, pasword): def check_password(self, pasword):
return check_password_hash(self._password_hash_, pasword) return check_password_hash(self._password_hash_, pasword)
class TallymanPlace(Base): class TallymanPlace(Base):
__tablename__ = 'tallyman_place' __tablename__ = 'tallyman_place'
id = Column(INTEGER(11), primary_key=True, unique=True) id = Column(INTEGER(11), primary_key=True, unique=True)
......
This diff is collapsed. Click to expand it.
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2022/01/10
@file: __init__.py.py
@function:
@modify:
"""
from flask import Flask
from flask_cors import CORS
from flask_log_request_id import RequestID
from dotenv import load_dotenv
from models.base_model import db
from utils.my_redis_cache import redis_client
from utils.mylogger import set_logger
def create_app(config_name):
from config.env_path_config import env_path
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
set_logger()
app = Flask("pc_management")
from config.app_config import config
app.config.from_object(config[config_name])
CORS(app)
db.init_app(app)
redis_client.init_app(app)
RequestID(app)
from utils.middlewares import jwt_authentication, log_enter_interface, log_out_interface, close_db_session, \
get_platform, all_options_pass
app.before_request(log_enter_interface)
app.before_request(all_options_pass)
app.before_request(get_platform)
app.before_request(jwt_authentication)
app.after_request(log_out_interface)
app.after_request(close_db_session)
# todo register blueprint
from myapps.pc_management.api import register_sukang_blueprint
register_sukang_blueprint(app)
return app
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2022/01/10
@file: __init__.py.py
@function:
@modify:
"""
from flask import Flask
from myapps.pc_management.api.admin_portal import admin_route
def register_sukang_blueprint(app: Flask):
prefix = "/pc_management"
app.register_blueprint(admin_route, url_prefix=prefix + "/admin")
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/08/05
@file: index_portal.py
@function:
@modify:
"""
import datetime
from flask import Blueprint, g
import logging
from sqlalchemy import func
from models.agent_model import AgentSpot
from models.base_model import db
from models.rent_models import Production
from models.spot_models import Spot
from service.index_service import IndexService
from utils.Helper import Helper
from utils.error_code.account_error import ACCOUNT_AGENT_SPOT_NOT_EXIST
from utils.my_response import BaseResponse
logger = logging.getLogger(__name__)
route_index = Blueprint('index', __name__)
@route_index.route("/day_income", methods=["GET"])
def get_today_income_data():
agent = g.user
return_data = {"today_income": None, "today_count": None, 'yesterday_income': None, 'yesterday_count': None}
agent_spot_list = AgentSpot.query.filter_by(agent_no=agent.id, status=1).all()
if agent_spot_list:
spot_no_list = [i.spot_no for i in agent_spot_list]
else:
return BaseResponse(data=return_data)
today, tomorrow = Helper.get_today_date()
today_income, today_count = IndexService.get_total_production(spot_no_list, today, tomorrow)
return_data["today_income"] = today_income
return_data["today_count"] = today_count
today, yesterday = Helper.get_yesterday_date()
yesterday_income, yesterday_count = IndexService.get_total_production(spot_no_list, today, yesterday)
return_data["yesterday_income"] = yesterday_income
return_data["yesterday_count"] = yesterday_count
return BaseResponse(data=return_data)
@route_index.route("/week_income", methods=["GET"])
def get_week_income_data():
agent = g.user
return_data = {"week_income": None, "week_count": None, 'last_week_income': None, 'last_week_count': None}
agent_spot_list = AgentSpot.query.filter_by(agent_no=agent.id, status=1).all()
if agent_spot_list:
spot_no_list = [i.spot_no for i in agent_spot_list]
else:
return BaseResponse(data=return_data)
monday, next_monday = Helper.get_week_date()
today_income, today_count = IndexService.get_total_production(spot_no_list, monday, next_monday)
return_data["week_income"] = today_income
return_data["week_count"] = today_count
last_monday, monday = Helper.get_last_week_date()
yesterday_income, yesterday_count = IndexService.get_total_production(spot_no_list, last_monday, monday)
return_data["last_week_income"] = yesterday_income
return_data["last_week_count"] = yesterday_count
return BaseResponse(data=return_data)
@route_index.route("/month_income", methods=["GET"])
def get_month_income_data():
agent = g.user
return_data = {"month_income": None, "month_count": None, 'last_month_income': None, 'last_month_count': None}
agent_spot_list = AgentSpot.query.filter_by(agent_no=agent.id, status=1).all()
if agent_spot_list:
spot_no_list = [i.spot_no for i in agent_spot_list]
else:
return BaseResponse(data=return_data)
month, next_month = Helper.get_month_date()
today_income, today_count = IndexService.get_total_production(spot_no_list, month, next_month)
return_data["month_income"] = today_income
return_data["month_count"] = today_count
last_month, month = Helper.get_last_month_date()
yesterday_income, yesterday_count = IndexService.get_total_production(spot_no_list, last_month, month)
return_data["last_month_income"] = yesterday_income
return_data["last_month_count"] = yesterday_count
return BaseResponse(data=return_data)
@route_index.route("/history_income", methods=["GET"])
def get_history_income_data():
agent = g.user
return_data = {"history_income": None, "history_count": None}
agent_spot_list = AgentSpot.query.filter_by(agent_no=agent.id, status=1).all()
if agent_spot_list:
spot_no_list = [i.spot_no for i in agent_spot_list]
else:
return BaseResponse(data=return_data)
today_income, today_count = IndexService.get_total_production(spot_no_list, start_time=None, end_time=None)
return_data["history_income"] = float(today_income)
return_data["history_count"] = today_count
return BaseResponse(data=return_data)
@route_index.route("/seven_day_count", methods=["GET"])
def get_seven_day_count_data():
agent = g.user
agent_spot_list = AgentSpot.query.filter_by(agent_no=agent.id, status=1).all()
if agent_spot_list:
spot_no_list = [i.spot_no for i in agent_spot_list]
else:
return BaseResponse(data=[])
spot_list = db.session.query(Spot).filter(Spot.id.in_(spot_no_list)).all()
if spot_list:
spot_data = {}
for i in spot_list:
spot_data[i.id] = i.spotname
else:
return BaseResponse(**ACCOUNT_AGENT_SPOT_NOT_EXIST)
return_data = []
seven_day, today = Helper.get_seven_date()
filter_list = [
Production.agent_total > 0,
Production.created_at >= seven_day,
Production.created_at < today,
]
result = db.session.query(func.count(Production.id), Production.spot_id,
func.date_format(Production.created_at, "%Y-%m-%d")).filter(*filter_list).group_by(
Production.spot_id, func.day(Production.created_at)).order_by(Production.created_at.asc()).all()
tmp_data = {}
for i in result:
count = i[0]
spot_id = i[1]
created_at = i[2]
if tmp_data.get(spot_id, None):
tmp_data[spot_id].append({"date": created_at, "count": count, "spot_id": spot_id})
else:
tmp_data[spot_id] = [{"date": created_at, "count": count, }]
for k, v in tmp_data.items():
return_data.append({"spot_id": k, "spot_name": spot_data.get(k, spot_id), "count_data": v})
return BaseResponse(data=return_data)
@route_index.route("/last_seven_day_count", methods=["GET"])
def get_last_seven_day_count_data():
agent = g.user
agent_spot_list = AgentSpot.query.filter_by(agent_no=agent.id, status=1).all()
if agent_spot_list:
spot_no_list = [i.spot_no for i in agent_spot_list]
else:
return BaseResponse(data=[])
spot_list = db.session.query(Spot).filter(Spot.id.in_(spot_no_list)).all()
if spot_list:
spot_data = {}
for i in spot_list:
spot_data[i.id] = i.spotname
else:
return BaseResponse(**ACCOUNT_AGENT_SPOT_NOT_EXIST)
return_data = []
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
seven_day = today + datetime.timedelta(days=-7)
last_seven_day = seven_day + datetime.timedelta(days=-7)
filter_list = [
Production.agent_total > 0,
Production.created_at >= last_seven_day.strftime("%Y-%m-%d %H:%M:%S"),
Production.created_at < seven_day.strftime("%Y-%m-%d %H:%M:%S"),
]
result = db.session.query(func.count(Production.id), Production.spot_id,
func.date_format(Production.created_at, "%Y-%m-%d")).filter(*filter_list).group_by(
Production.spot_id, func.day(Production.created_at)).order_by(Production.created_at.asc()).all()
tmp_data = {}
for i in result:
count = i[0]
spot_id = i[1]
created_at = i[2]
if tmp_data.get(spot_id, None):
tmp_data[spot_id].append({"date": created_at, "count": count, "spot_id": spot_id})
else:
tmp_data[spot_id] = [{"date": created_at, "count": count, }]
for k, v in tmp_data.items():
return_data.append({"spot_id": k, "spot_name": spot_data.get(k, spot_id), "count_data": v})
return BaseResponse(data=return_data)
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
@author:Aeolus
"""
import logging
from myapps.pc_management import create_app
logger = logging.getLogger(__name__)
app = create_app('prod')
logger.info("run server")
if __name__ == '__main__':
app.run('127.0.0.1', 8894, debug=True)
# -*- coding: utf-8 -*-
import base64
import datetime
import hashlib
import random
import string
from config.commen_config import ACCOUNT_STATUS
from models.models import AdminAccount
from models.base_model import db
from models.models import Place
# from models.user_models import AgentAccount
# from service.spot_service import SpotService
class AdminService():
@staticmethod
def gene_salt(length=16):
key_list = [random.choice((string.ascii_letters + string.digits)) for i in range(length)]
return ("".join(key_list))
@staticmethod
def gene_agent_code(agent_info, salt):
"""
:param agent_info:
:param salt:
:return:
"""
m = hashlib.md5()
str = "%s-%s-%s-%s" % (agent_info.id, agent_info.user_name, agent_info.phone, salt)
m.update(str.encode("utf-8"))
return m.hexdigest()
@staticmethod
def gene_pwd(pwd, salt):
"""
:param pwd:
:param salt:
:return:
"""
m = hashlib.md5()
str = "%s-%s" % (base64.encodebytes(pwd.encode("utf-8")), salt)
m.update(str.encode("utf-8"))
return m.hexdigest()
@staticmethod
def check_agent_token(token):
"""
:param token:
:return:
"""
token = base64.b64decode(token).decode("utf-8")
try:
agent_info = AgentAccount.query.filter_by(access_token=token).first()
except Exception as e:
return 1
if not agent_info:
return 1
s = token.split("#")
if len(s) != 2:
return 1
if AgentService.gene_agent_code(agent_info, agent_info.salt) != s[0]:
return 1
if agent_info.expire_time < datetime.datetime.now():
return 2
return agent_info
@staticmethod
def create_agent_no():
'''
生成用户编号
:return:
'''
ran_int = str(random.randint(1, 999999)).zfill(6)
return 'ssw' + ran_int
@staticmethod
def get_spot_info(agent_info):
"""
:param agent_info:
:return:
"""
spot_info = []
infos = db.session.query(AgentSpot, Spot).join(
Spot, Spot.id == AgentSpot.spot_no).filter(AgentSpot.agent_no == agent_info.id,
AgentSpot.status == ACCOUNT_STATUS['on_use']).all()
for info in infos:
cur_info = {}
cur_info['id'] = info.Spot.id
cur_info['spotname'] = info.Spot.spotname
cur_info['letter'] = SpotService.get_pinyin(info.Spot.spotname)
spot_info.append(cur_info)
return spot_info
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') # token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) token = generate_jwt({"user_no": 'SK000007'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1) #!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) # token = generate_jwt({"user_no": 'SK000007'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file \ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment