Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Automat
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
冯佳佳
Automat
Commits
30a67d3d
Commit
30a67d3d
authored
Oct 16, 2021
by
Aeolus
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
ee4fd48f
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
91 additions
and
87 deletions
+91
-87
models/models.py
+5
-2
myapps/sukang24h/api/__init__.py
+2
-0
myapps/sukang24h/api/machine_portal.py
+50
-81
myapps/sukang24h/api/tallyman_portal.py
+30
-0
utils/jwt_util.py
+2
-2
utils/middlewares.py
+2
-2
No files found.
models/models.py
View file @
30a67d3d
...
...
@@ -57,13 +57,16 @@ class Machine(Base):
__tablename__
=
'machine'
id
=
Column
(
INTEGER
(
10
),
primary_key
=
True
)
machine_no
=
Column
(
String
(
17
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
unique
=
True
,
comment
=
'机柜编号'
)
machine_no
=
Column
(
String
(
20
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
unique
=
True
,
comment
=
'机柜编号'
)
qrcode_no
=
Column
(
String
(
20
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
unique
=
True
,
comment
=
'二维码编号'
)
mac
=
Column
(
String
(
30
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
unique
=
True
,
comment
=
'mac'
)
power
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'0'"
),
comment
=
'电量'
)
short_address
=
Column
(
VARCHAR
(
45
))
address
=
Column
(
String
(
191
,
'utf8mb4_unicode_ci'
),
comment
=
'机柜位置'
)
place_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
)
mch_platform
=
Column
(
INTEGER
(
11
),
nullable
=
False
,
server_default
=
text
(
"'1'"
),
comment
=
'1随身玩 2晓见文旅'
)
position
=
Column
(
String
(
20
,
'utf8mb4_unicode_ci'
),
comment
=
'机柜位置坐标'
)
hatch_number
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'
6
0'"
),
comment
=
'机柜的仓口数量'
)
hatch_number
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'0'"
),
comment
=
'机柜的仓口数量'
)
type
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'1'"
),
comment
=
'机柜类型1正常'
)
status
=
Column
(
TINYINT
(
1
),
server_default
=
text
(
"'1'"
),
comment
=
'状态: 1正常-1删除'
)
created_at
=
Column
(
TIMESTAMP
,
nullable
=
False
,
server_default
=
text
(
"CURRENT_TIMESTAMP"
))
...
...
myapps/sukang24h/api/__init__.py
View file @
30a67d3d
...
...
@@ -11,6 +11,7 @@ from myapps.sukang24h.api.wx_auth_portal import wx_auth_route
from
myapps.sukang24h.api.hatch_portal
import
hatch_route
from
myapps.sukang24h.api.rent_portal
import
rent_route
from
myapps.sukang24h.api.tallyman_portal
import
tallyman_route
from
myapps.sukang24h.api.machine_portal
import
machine_route
def
register_sukang_blueprint
(
app
:
Flask
):
...
...
@@ -19,3 +20,4 @@ def register_sukang_blueprint(app: Flask):
app
.
register_blueprint
(
hatch_route
,
url_prefix
=
prefix
+
"/hatch"
)
app
.
register_blueprint
(
rent_route
,
url_prefix
=
prefix
+
"/rent"
)
app
.
register_blueprint
(
tallyman_route
,
url_prefix
=
prefix
+
"/tallyman"
)
app
.
register_blueprint
(
machine_route
,
url_prefix
=
prefix
+
"/machine"
)
myapps/sukang24h/api/machine_portal.py
View file @
30a67d3d
...
...
@@ -19,97 +19,66 @@ logger = logging.getLogger(__name__)
machine_route
=
Blueprint
(
'machine'
,
__name__
)
@machine_route.route
(
'list'
,
methods
=
[
"post"
])
def
get_production_list
():
@machine_route.route
(
'mac_upload'
,
methods
=
[
"post"
])
def
run_machine_mac_upload
():
json_data
=
request
.
get_json
()
machine_no
=
json_data
[
"machine_no"
]
mac
=
json_data
[
"mac"
]
machine
_info
=
Machine
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
first
()
if
not
machine
_info
:
machine
=
Machine
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
first
()
if
not
machine
:
return
jsonify
(
MACHINE_NOT_EXIST_ERROR
)
machine
.
mac
=
mac
db
.
session
.
add
(
machine
)
db
.
session
.
commit
()
return
BaseResponse
()
hatch_list
=
Hatch
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
order_by
(
Hatch
.
hatch_no
.
asc
())
.
all
()
if
not
hatch_list
:
return
jsonify
(
HATCH_NOT_EXIST_ERROR
)
tmp_dict
=
{}
for
i
in
hatch_list
:
if
tmp_dict
.
get
(
i
.
production_id
,
None
)
is
None
:
tmp_dict
[
i
.
production_id
]
=
{
"machine_no"
:
i
.
machine_no
,
"hatch_no"
:
[
i
.
hatch_no
],
"production_id"
:
i
.
production_id
,
"name"
:
i
.
name
,
"title"
:
i
.
title
,
"brand_id"
:
i
.
brand_id
,
"brand_name"
:
i
.
brand_name
,
"cate_id"
:
i
.
cate_id
,
"cate_name"
:
i
.
cate_name
,
"price"
:
i
.
price
,
"original_price"
:
i
.
original_price
,
"img"
:
i
.
img
,
"tags"
:
i
.
tags
,
"content"
:
i
.
content
,
"summary"
:
i
.
summary
,
"status"
:
i
.
status
,
"count"
:
1
}
else
:
tmp_dict
[
i
.
production_id
][
"count"
]
+=
1
tmp_dict
[
i
.
production_id
][
"hatch_no"
]
.
append
(
i
.
hatch_no
)
hatch_data
=
list
(
tmp_dict
.
values
())
return
BaseResponse
(
data
=
hatch_data
)
@machine_route.route
(
'info'
,
methods
=
[
"post"
])
def
get_production_info
():
@machine_route.route
(
'power_upload'
,
methods
=
[
"post"
])
def
run_machine_power_upload
():
json_data
=
request
.
get_json
()
machine_no
=
json_data
[
"machine_no"
]
hatch_no
=
json_data
[
"hatch_no
"
]
power
=
json_data
[
"power
"
]
machine
_info
=
Machine
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
first
()
if
not
machine
_info
:
machine
=
Machine
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
first
()
if
not
machine
:
return
jsonify
(
MACHINE_NOT_EXIST_ERROR
)
machine
.
power
=
power
db
.
session
.
add
(
machine
)
db
.
session
.
commit
()
return
BaseResponse
()
hatch_info
=
Hatch
.
query
.
filter_by
(
machine_no
=
machine_no
,
hatch_no
=
hatch_no
)
.
first
()
if
not
hatch_info
:
return
jsonify
(
HATCH_NOT_EXIST_ERROR
)
hatch_data
=
{
"machine_no"
:
hatch_info
.
machine_no
,
"hatch_no"
:
hatch_info
.
hatch_no
,
"production_id"
:
hatch_info
.
production_id
,
"name"
:
hatch_info
.
name
,
"title"
:
hatch_info
.
title
,
"brand_id"
:
hatch_info
.
brand_id
,
"brand_name"
:
hatch_info
.
brand_name
,
"cate_id"
:
hatch_info
.
cate_id
,
"cate_name"
:
hatch_info
.
cate_name
,
"price"
:
hatch_info
.
price
,
"original_price"
:
hatch_info
.
original_price
,
"img"
:
hatch_info
.
img
,
"tags"
:
hatch_info
.
tags
,
"content"
:
hatch_info
.
content
,
"summary"
:
hatch_info
.
summary
,
"status"
:
hatch_info
.
status
,
}
return
BaseResponse
(
data
=
hatch_data
)
@machine_route.route
(
"/hatch_open"
,
methods
=
[
"POST"
])
def
run_hatch_open
():
@machine_route.route
(
'hatch_number_upload'
,
methods
=
[
"post"
])
def
run_machine_hatch_number_upload
():
json_data
=
request
.
get_json
()
machine_no
=
json_data
[
'machine_no'
]
hatch_no
=
json_data
[
'hatch_no'
]
user_id
=
g
.
user
.
id
rent_detail
=
RentDetail
.
query
.
filter
(
RentDetail
.
user_id
==
user_id
,
RentDetail
.
machine_no
==
machine_no
,
RentDetail
.
hatch_no
==
hatch_no
,
RentDetail
.
status
!=
-
1
,
RentDetail
.
is_take
==
0
)
.
order_by
(
RentDetail
.
id
.
desc
())
.
first
()
if
rent_detail
:
rent_detail
.
is_take
=
1
db
.
session
.
add
(
rent_detail
)
db
.
session
.
commit
()
machine_no
=
json_data
[
"machine_no"
]
hatch_number
=
json_data
[
"hatch_number"
]
machine
=
Machine
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
first
()
if
not
machine
:
return
jsonify
(
MACHINE_NOT_EXIST_ERROR
)
machine
.
hatch_number
=
hatch_number
db
.
session
.
add
(
machine
)
db
.
session
.
commit
()
return
BaseResponse
()
@machine_route.route
(
'hatch_data_upload'
,
methods
=
[
"post"
])
def
run_machine_hatch_data_upload
():
json_data
=
request
.
get_json
()
machine_no
=
json_data
[
"machine_no"
]
hatch_data
=
json_data
[
"in_stock"
]
machine
=
Machine
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
first
()
if
not
machine
:
return
jsonify
(
MACHINE_NOT_EXIST_ERROR
)
Hatch
.
query
.
filter
(
Hatch
.
hatch_no
.
in_
(
hatch_data
))
.
update
({
"status"
:
1
})
db
.
session
.
commit
()
Hatch
.
query
.
filter
(
Hatch
.
hatch_no
.
notin_
(
hatch_data
))
.
update
({
"status"
:
2
})
db
.
session
.
commit
()
return
BaseResponse
()
myapps/sukang24h/api/tallyman_portal.py
View file @
30a67d3d
...
...
@@ -305,3 +305,33 @@ def run_tally_over():
db
.
session
.
commit
()
return
BaseResponse
()
@tallyman_route.route
(
"/tally_report"
,
methods
=
[
"POST"
])
def
get_tally_report
():
json_data
=
request
.
get_json
()
machine_info
=
TallymanService
.
get_machine_list
(
g
.
user
)
empty_number
=
0
for
i
in
machine_info
:
empty_number
+=
int
(
i
[
"empty_number"
])
over_number
=
TallyRecord
.
query
.
filter
(
TallyRecord
.
user_no
==
g
.
user
.
user_no
,
TallyRecord
.
status
==
2
)
.
count
()
return
BaseResponse
(
data
=
{
"empty_number"
:
empty_number
,
"over_number"
:
over_number
})
@tallyman_route.route
(
'machine_activate'
,
methods
=
[
"post"
])
def
run_machine_activate
():
json_data
=
request
.
get_json
()
qrcode_no
=
json_data
[
"qrcode_no"
]
machine_no
=
json_data
[
"machine_no"
]
machine
=
Machine
.
query
.
filter_by
(
qrcode_no
=
qrcode_no
,
status
=
1
)
.
first
()
if
machine
and
machine
.
machine_no
==
qrcode_no
:
machine
.
machine_no
=
machine_no
db
.
session
.
add
(
machine
)
db
.
session
.
commit
()
return
BaseResponse
()
else
:
return
jsonify
(
MACHINE_NOT_EXIST_ERROR
)
utils/jwt_util.py
View file @
30a67d3d
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) # token = generate_jwt({"user_no": 'SK000007'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') # token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) token = generate_jwt({"user_no": 'SK000007'}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file
...
...
utils/middlewares.py
View file @
30a67d3d
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ print(request.path) path_list = request.path.split("/") if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser, TallymanAccount from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ print(request.path) path_list = request.path.split("/") if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), url_for('tallyman.run_tallyman_login'), ] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" if request.path.split("/")[2] == "tallyman": user_no = payload.get('user_no') if not user_no: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) if request.path.split("/")[2] == "machine": user_no = payload.get('user_no', None) user_id = payload.get('user_id', None) if user_no: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) if user_id: try: g.user = TallymanAccount.query.filter_by(user_no=user_no).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) return BaseResponse(**TOKEN_NOT_VALID_ERROR) user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() if not g.user: return BaseResponse(**TOKEN_NOT_VALID_ERROR) return except Exception as e: print(e) return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) else: NO_AUTH_CHECK_URL = [] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment