Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Automat
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
冯佳佳
Automat
Commits
e48d50f4
Commit
e48d50f4
authored
Oct 15, 2021
by
Aeolus
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
3e916194
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
154 additions
and
344 deletions
+154
-344
models/models.py
+5
-5
myapps/sukang24h/api/hatch_portal.py
+12
-6
myapps/sukang24h/api/rent_portal.py
+104
-51
mytest.py
+12
-0
service/rent_service.py
+7
-7
utils/error_code.py
+0
-0
utils/jwt_util.py
+2
-66
utils/middlewares.py
+2
-109
utils/my_redis_cache.py
+2
-11
utils/my_response.py
+2
-21
utils/mylogger.py
+2
-15
utils/wechat/WXBizDataCrypt.py
+2
-45
utils/wechat/WXBizMsgCrypt.py
+0
-0
utils/wechat/__init__.py
+2
-8
No files found.
models/models.py
View file @
e48d50f4
...
...
@@ -35,7 +35,7 @@ class Hatch(Base):
hatch_no
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
comment
=
'机柜仓口号'
)
production_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'商品id'
)
name
=
Column
(
String
(
100
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品名称'
)
tit
i
le
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品标题'
)
title
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品标题'
)
brand_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'品牌ID'
)
brand_name
=
Column
(
String
(
100
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品名称'
)
cate_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'分类ID'
)
...
...
@@ -102,7 +102,7 @@ class Production(Base):
id
=
Column
(
INTEGER
(
10
),
primary_key
=
True
)
name
=
Column
(
String
(
100
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
index
=
True
,
comment
=
'商品名称'
)
tit
i
le
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品标题'
)
title
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品标题'
)
brand_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'品牌ID'
)
cate_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'分类ID'
)
price
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'价格'
)
...
...
@@ -155,7 +155,7 @@ class RentDetail(Base):
production_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'商品id'
)
is_take
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'0'"
),
comment
=
'是否取货'
)
name
=
Column
(
String
(
100
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品名称'
)
tit
i
le
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品标题'
)
title
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品标题'
)
brand_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'品牌ID'
)
brand_name
=
Column
(
String
(
100
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'商品名称'
)
cate_id
=
Column
(
INTEGER
(
10
),
nullable
=
False
,
comment
=
'分类ID'
)
...
...
@@ -165,7 +165,7 @@ class RentDetail(Base):
tags
=
Column
(
String
(
255
,
'utf8mb4_unicode_ci'
),
comment
=
'商品标签'
)
content
=
Column
(
Text
(
collation
=
'utf8mb4_unicode_ci'
),
comment
=
'商品内容'
)
summary
=
Column
(
Text
(
collation
=
'utf8mb4_unicode_ci'
),
comment
=
'商品描述'
)
status
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'
0'"
),
comment
=
'1正常 2
删除'
)
status
=
Column
(
TINYINT
(
3
),
nullable
=
False
,
server_default
=
text
(
"'
1'"
),
comment
=
'1正常 -1
删除'
)
created_at
=
Column
(
TIMESTAMP
,
server_default
=
text
(
"CURRENT_TIMESTAMP"
))
updated_at
=
Column
(
TIMESTAMP
,
server_default
=
text
(
"CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
))
...
...
@@ -175,7 +175,7 @@ class SalePlan(Base):
id
=
Column
(
INTEGER
(
10
),
primary_key
=
True
)
name
=
Column
(
String
(
100
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'方案名称'
)
tit
i
le
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'方案标题'
)
title
=
Column
(
String
(
200
,
'utf8mb4_unicode_ci'
),
nullable
=
False
,
comment
=
'方案标题'
)
status
=
Column
(
TINYINT
(
1
),
nullable
=
False
,
comment
=
'状态: 1整除-1删除'
)
created_at
=
Column
(
TIMESTAMP
,
server_default
=
text
(
"CURRENT_TIMESTAMP"
))
updated_at
=
Column
(
TIMESTAMP
,
server_default
=
text
(
"CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
))
...
...
myapps/sukang24h/api/hatch_portal.py
View file @
e48d50f4
...
...
@@ -27,16 +27,19 @@ def get_production_list():
if
not
machine_info
:
return
jsonify
(
MACHINE_NOT_EXIST_ERROR
)
hatch_list
=
Hatch
.
query
.
filter_by
(
machine_no
=
machine_no
)
.
order_by
(
Hatch
.
hatch_no
.
asc
())
.
all
()
hatch_list
=
Hatch
.
query
.
filter_by
(
machine_no
=
machine_no
,
status
=
1
)
.
order_by
(
Hatch
.
hatch_no
.
asc
())
.
all
()
if
not
hatch_list
:
return
jsonify
(
HATCH_NOT_EXIST_ERROR
)
hatch_data
=
[{
tmp_dict
=
{}
for
i
in
hatch_list
:
if
tmp_dict
.
get
(
i
.
production_id
,
None
):
tmp_dict
[
i
.
production_id
]
=
{
"machine_no"
:
i
.
machine_no
,
"hatch_no"
:
i
.
hatch_no
,
"production_id"
:
i
.
production_id
,
"name"
:
i
.
name
,
"titile"
:
i
.
titi
le
,
"title"
:
i
.
tit
le
,
"brand_id"
:
i
.
brand_id
,
"brand_name"
:
i
.
brand_name
,
"cate_id"
:
i
.
cate_id
,
...
...
@@ -48,8 +51,11 @@ def get_production_list():
"content"
:
i
.
content
,
"summary"
:
i
.
summary
,
"status"
:
i
.
status
,
}
for
i
in
hatch_list
]
"count"
:
1
}
else
:
tmp_dict
[
i
.
production_id
][
"count"
]
+=
1
hatch_data
=
tmp_dict
.
values
()
return
BaseResponse
(
data
=
hatch_data
)
...
...
@@ -72,7 +78,7 @@ def get_production_info():
"hatch_no"
:
hatch_info
.
hatch_no
,
"production_id"
:
hatch_info
.
production_id
,
"name"
:
hatch_info
.
name
,
"tit
ile"
:
hatch_info
.
titi
le
,
"tit
le"
:
hatch_info
.
tit
le
,
"brand_id"
:
hatch_info
.
brand_id
,
"brand_name"
:
hatch_info
.
brand_name
,
"cate_id"
:
hatch_info
.
cate_id
,
...
...
myapps/sukang24h/api/rent_portal.py
View file @
e48d50f4
This diff is collapsed.
Click to expand it.
mytest.py
0 → 100644
View file @
e48d50f4
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/10/15
@file: mytest.py
@function:
@modify:
"""
a
=
{
'productions'
:
[
'3'
],
'machine_no'
:
'1636127865'
,
'user_id'
:
1
,
'machine_id'
:
1
,
'platform'
:
1
}
service/rent_service.py
View file @
e48d50f4
...
...
@@ -5,6 +5,8 @@ import random
from
sqlalchemy.exc
import
SQLAlchemyError
import
logging
from
config.commen_config
import
USER_RENT_PREPAY_ID
from
models.base_model
import
db
from
models.models
import
Rent
from
utils.my_redis_cache
import
redis_client
...
...
@@ -36,8 +38,10 @@ class RentService(object):
try
:
model
=
Rent
()
model
.
rent_no
=
rent_no
model
.
machine_id
=
machine
.
id
model
.
customer_id
=
data
[
"user_id"
]
model
.
machine_no
=
machine
.
machine_no
model
.
user_id
=
data
[
"user_id"
]
model
.
place_id
=
machine
.
place_id
model
.
total
=
0
model
.
add_time
=
datetime
.
datetime
.
now
()
model
.
spot_id
=
machine
.
spot_id
model
.
business_id
=
machine
.
business_id
...
...
@@ -46,11 +50,7 @@ class RentService(object):
prepay_id
=
redis_client
.
get
(
USER_RENT_PREPAY_ID
+
str
(
data
[
"user_id"
])
+
rent_no
)
if
prepay_id
:
model
.
prepay_id
=
prepay_id
if
data
[
"power_type"
]:
model
.
power_type
=
data
[
"power_type"
]
power_money
=
PowerMoney
.
query
.
filter_by
(
machine_id
=
machine
.
id
,
power_type
=
data
[
"power_type"
])
.
first
()
model
.
one_day_price
=
machine
.
one_day_price
if
not
power_money
else
power_money
.
total
db
.
session
.
add
(
model
)
db
.
session
.
commit
()
return
model
...
...
utils/error_code.py
View file @
e48d50f4
This diff is collapsed.
Click to expand it.
utils/jwt_util.py
View file @
e48d50f4
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/30
@file: jwt_util.py
@function:
@modify:
"""
import
jwt
from
flask
import
current_app
def
generate_jwt
(
payload
,
expiry
,
secret
=
None
):
"""
生成jwt
:param payload: dict 载荷
:param expiry: datetime 有效期
:param secret: 密钥
:return: jwt
"""
_payload
=
{
'exp'
:
expiry
}
_payload
.
update
(
payload
)
if
not
secret
:
secret
=
current_app
.
config
[
'SECRET_KEY'
]
token
=
jwt
.
encode
(
_payload
,
secret
,
algorithm
=
'HS256'
)
return
token
def
verify_jwt
(
token
,
secret
=
None
):
"""
检验jwt
:param token: jwt
:param secret: 密钥
:return: dict: payload
"""
if
not
secret
:
secret
=
current_app
.
config
[
'SECRET_KEY'
]
try
:
payload
=
jwt
.
decode
(
token
,
secret
,
algorithms
=
[
'HS256'
])
except
jwt
.
PyJWTError
:
payload
=
None
return
payload
if
__name__
==
'__main__'
:
import
time
from
config.env_path_config
import
env_path
from
dotenv
import
load_dotenv
load_dotenv
(
dotenv_path
=
env_path
,
verbose
=
True
,
override
=
True
)
import
os
SECRET_KEY
=
os
.
getenv
(
'SECRET_KEY'
)
token
=
generate_jwt
({
"user_id"
:
1
},
time
.
time
()
+
6000
,
SECRET_KEY
)
print
(
token
)
# for i in range(10):
# result = verify_jwt(token, 'secret')
# print(result)
# print(time.time())
# time.sleep(1)
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/30 @file: jwt_util.py @function: @modify: """ import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['SECRET_KEY'] token = jwt.encode(_payload, secret, algorithm='HS256') return token def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['SECRET_KEY'] try: payload = jwt.decode(token, secret, algorithms=['HS256']) except jwt.PyJWTError: payload = None return payload if __name__ == '__main__': import time from config.env_path_config import env_path from dotenv import load_dotenv load_dotenv(dotenv_path=env_path, verbose=True, override=True) import os SECRET_KEY = os.getenv('SECRET_KEY') token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY) print(token) # for i in range(10): # result = verify_jwt(token, 'secret') # print(result) # print(time.time()) # time.sleep(1)
\ No newline at end of file
...
...
utils/middlewares.py
View file @
e48d50f4
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: middlewares.py
@function:
@modify:
"""
import
logging
from
flask
import
g
,
request
,
url_for
,
current_app
,
make_response
,
jsonify
from
config.wechat_config
import
platform_config_list
from
models.models
import
WxUser
from
utils.error_code
import
TOKEN_NOT_VALID_ERROR
from
utils.my_response
import
BaseResponse
from
utils.jwt_util
import
verify_jwt
logger
=
logging
.
getLogger
(
__name__
)
def
log_enter_interface
():
"""
日志打印进入接口
:return:
"""
logger
.
info
(
"######################### 进入 {} 接口 ################################ "
.
format
(
request
.
path
))
def
log_out_interface
(
environ
):
"""
日志打印退出接口
:return:
"""
logger
.
info
(
"######################### 退出 {} 接口 ################################
\n
"
.
format
(
request
.
path
))
return
environ
def
close_db_session
(
environ
):
from
models.base_model
import
db
db
.
session
.
close
()
return
environ
"""用户认证机制==>每次请求前获取并校验token"""
"@myapps.before_request 不使@调用装饰器 在 init文件直接装饰"
def
jwt_authentication
():
"""
1.获取请求头Authorization中的token
2.判断是否以 Bearer开头
3.使用jwt模块进行校验
4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存
"""
if
current_app
.
name
==
"sukang24h"
:
NO_AUTH_CHECK_URL
=
[
url_for
(
'wx_auth.my_test'
),
url_for
(
'wx_auth.mini_login'
),
url_for
(
'rent.wx_pay_callback'
),
url_for
(
'hatch.get_production_list'
),
]
else
:
NO_AUTH_CHECK_URL
=
[]
if
request
.
path
not
in
NO_AUTH_CHECK_URL
:
token
=
request
.
headers
.
get
(
'Authorization'
)
# "校验token"
payload
=
verify_jwt
(
token
)
# "判断token的校验结果"
if
payload
:
# "获取载荷中的信息赋值给g对象"
user_id
=
payload
.
get
(
'user_id'
)
if
not
user_id
:
return
BaseResponse
(
**
TOKEN_NOT_VALID_ERROR
)
try
:
g
.
user
=
WxUser
.
query
.
filter_by
(
id
=
user_id
)
.
first
()
return
except
Exception
as
e
:
print
(
e
)
else
:
return
BaseResponse
(
**
TOKEN_NOT_VALID_ERROR
)
def
get_platform
():
"""
:return:
"""
g
.
platform
=
request
.
headers
.
get
(
'platform'
,
"sukang24h"
)
def
all_options_pass
():
"""
:return:
"""
if
request
.
method
==
"OPTIONS"
:
headers
=
{
'Access-Control-Allow-Origin'
:
'*'
,
'Access-Control-Allow-Methods'
:
'POST'
,
'Access-Control-Allow-Headers'
:
'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform'
,
}
return
make_response
((
jsonify
({
'error_code'
:
0
}),
200
,
headers
))
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/03/26 @file: middlewares.py @function: @modify: """ import logging from flask import g, request, url_for, current_app, make_response, jsonify from config.wechat_config import platform_config_list from models.models import WxUser from utils.error_code import TOKEN_NOT_VALID_ERROR from utils.my_response import BaseResponse from utils.jwt_util import verify_jwt logger = logging.getLogger(__name__) def log_enter_interface(): """ 日志打印进入接口 :return: """ logger.info("######################### 进入 {} 接口 ################################ ".format(request.path)) def log_out_interface(environ): """ 日志打印退出接口 :return: """ logger.info("######################### 退出 {} 接口 ################################\n".format(request.path)) return environ def close_db_session(environ): from models.base_model import db db.session.close() return environ """用户认证机制==>每次请求前获取并校验token""" "@myapps.before_request 不使@调用装饰器 在 init文件直接装饰" def jwt_authentication(): """ 1.获取请求头Authorization中的token 2.判断是否以 Bearer开头 3.使用jwt模块进行校验 4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存 """ if current_app.name == "sukang24h": NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'), url_for('wx_auth.mini_login'), url_for('rent.wx_pay_callback'), url_for('hatch.get_production_list'), ] else: NO_AUTH_CHECK_URL = [] if request.path not in NO_AUTH_CHECK_URL: token = request.headers.get('Authorization') # "校验token" payload = verify_jwt(token) # "判断token的校验结果" if payload: # "获取载荷中的信息赋值给g对象" user_id = payload.get('user_id') if not user_id: return BaseResponse(**TOKEN_NOT_VALID_ERROR) try: g.user = WxUser.query.filter_by(id=user_id).first() return except Exception as e: print(e) else: return BaseResponse(**TOKEN_NOT_VALID_ERROR) def get_platform(): """ :return: """ g.platform = request.headers.get('platform', "sukang24h") def all_options_pass(): """ :return: """ if request.method == "OPTIONS": headers = {'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform', } return make_response((jsonify({'error_code': 0}), 200, headers))
\ No newline at end of file
...
...
utils/my_redis_cache.py
View file @
e48d50f4
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: my_redis_cache.py
"""
from
flask_redis
import
FlaskRedis
redis_client
=
FlaskRedis
(
config_prefix
=
'TENCENT_REDIS'
)
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @file: my_redis_cache.py """ from flask_redis import FlaskRedis redis_client = FlaskRedis(config_prefix='TENCENT_REDIS')
\ No newline at end of file
...
...
utils/my_response.py
View file @
e48d50f4
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: my_response.py
@function:
@modify:
"""
from
flask
import
Response
from
flask.json
import
dumps
class
BaseResponse
(
Response
):
def
__init__
(
self
,
data
=
None
,
error_code
=
0
,
error_message
=
'Success'
,
*
args
,
**
kwargs
):
if
data
is
not
None
:
result
=
dumps
(
dict
(
data
=
data
,
error_code
=
error_code
,
error_message
=
error_message
,
*
args
,
**
kwargs
))
else
:
result
=
dumps
(
dict
(
error_code
=
error_code
,
error_message
=
error_message
,
*
args
,
**
kwargs
))
Response
.
__init__
(
self
,
result
,
mimetype
=
'application/json'
)
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @time: 2021/04/27 @file: my_response.py @function: @modify: """ from flask import Response from flask.json import dumps class BaseResponse(Response): def __init__(self, data=None, error_code=0, error_message='Success', *args, **kwargs): if data is not None: result = dumps(dict(data=data, error_code=error_code, error_message=error_message, *args, **kwargs)) else: result = dumps(dict(error_code=error_code, error_message=error_message, *args, **kwargs)) Response.__init__(self, result, mimetype='application/json')
\ No newline at end of file
...
...
utils/mylogger.py
View file @
e48d50f4
import
logging
from
flask_log_request_id
import
RequestIDLogFilter
def
set_logger
():
console_handler
=
logging
.
StreamHandler
()
console_handler
.
setLevel
(
logging
.
DEBUG
)
console_handler
.
setFormatter
(
logging
.
Formatter
(
"
%(asctime)
s -
%(filename)
s -
%(funcName)
s -[line:
%(lineno)
d] -
%(levelname)
s - request_id=
%(request_id)
s:
%(message)
s"
,
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
))
console_handler
.
addFilter
(
RequestIDLogFilter
())
logging
.
getLogger
()
.
setLevel
(
logging
.
DEBUG
)
logging
.
getLogger
()
.
addHandler
(
console_handler
)
import
logging
from
flask_log_request_id
import
RequestIDLogFilter
def
set_logger
():
console_handler
=
logging
.
StreamHandler
()
console_handler
.
setLevel
(
logging
.
DEBUG
)
console_handler
.
setFormatter
(
logging
.
Formatter
(
"
%(asctime)
s -
%(filename)
s -
%(funcName)
s -[line:
%(lineno)
d] -
%(levelname)
s - request_id=
%(request_id)
s:
%(message)
s"
,
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
))
console_handler
.
addFilter
(
RequestIDLogFilter
())
logging
.
getLogger
()
.
setLevel
(
logging
.
DEBUG
)
logging
.
getLogger
()
.
addHandler
(
console_handler
)
\ No newline at end of file
...
...
utils/wechat/WXBizDataCrypt.py
View file @
e48d50f4
import
base64
import
json
from
Crypto.Cipher
import
AES
class
WXBizDataCrypt
:
def
__init__
(
self
,
appId
,
sessionKey
):
self
.
appId
=
appId
self
.
sessionKey
=
sessionKey
def
decrypt
(
self
,
encryptedData
,
iv
):
sessionKey
=
base64
.
decodebytes
(
bytes
(
self
.
sessionKey
,
encoding
=
'utf8'
))
encryptedData
=
base64
.
decodebytes
(
bytes
(
encryptedData
,
encoding
=
'utf8'
))
iv
=
base64
.
decodebytes
(
bytes
(
iv
,
encoding
=
'utf8'
))
cipher
=
AES
.
new
(
sessionKey
,
AES
.
MODE_CBC
,
iv
)
des_str
=
cipher
.
decrypt
(
encryptedData
)
print
(
"=================================="
)
print
(
des_str
)
des_str
=
self
.
_unpad
(
des_str
)
print
(
des_str
)
des_str
=
str
(
des_str
,
encoding
=
'utf-8'
)
decrypted
=
json
.
loads
(
des_str
)
if
decrypted
[
'watermark'
][
'appid'
]
!=
self
.
appId
:
raise
Exception
(
'Invalid Buffer'
)
return
decrypted
def
_unpad
(
self
,
s
):
return
s
[:
-
ord
(
s
[
len
(
s
)
-
1
:])]
if
__name__
==
'__main__'
:
appId
=
'wx3185fb4a3633beb0'
sessionKey
=
'S7CMDfC6jXJKSaWKanG8oQ=='
encryptedData
=
'E7LZhvK7mOcaYsv9xcAfsBN9eSbzFh9FyMtFJ0zsFB0M62zRJ0cosZWksUujUR5WYUmNoIfIJnTIF8gRskxxbFU3fm5X7z4ChZecMSaFM65aEK1suRUD1U0ubB7mOwBBlY4ftdPT5kRwWgXKVkM4VAkYGN8A4fjWE93yGtjzxXs9dypQkCLSNWs6Kw5USEzjhtDZnptVy+lHF5fTXRuzoCstW2Cto4YI3G9hmnS64QuWjRteSqIgh8GN1zEPN0dROJjaWBjqraBCt/BfMsk4HBeL4PA75K8WdqVgKGfQ7/rnmPFOsNXWfajx9jl7XcrfoPaaPL1DmIJ1BlQne2GuLFtzZ3O4/8cdVQ9Lb0N/3kFAzjgzNFNLSYj2VNctmWyLdWi8hH90yslvrODIhMzIsuux2GIAfp0rQd/iVIVvtd7PXBOCe5iZ7aaqD0b0mLF4CmsuBpl8Eh20ZHkYw2SqO0x9uFrS/gy1vwtkmsTpcDw='
iv
=
'DQcmcXyQkU+VKqb2mKmasQ=='
pc
=
WXBizDataCrypt
(
appId
,
sessionKey
)
pc
.
decrypt
(
encryptedData
,
iv
)
#
import
base64
import
json
from
Crypto.Cipher
import
AES
class
WXBizDataCrypt
:
def
__init__
(
self
,
appId
,
sessionKey
):
self
.
appId
=
appId
self
.
sessionKey
=
sessionKey
def
decrypt
(
self
,
encryptedData
,
iv
):
sessionKey
=
base64
.
decodebytes
(
bytes
(
self
.
sessionKey
,
encoding
=
'utf8'
))
encryptedData
=
base64
.
decodebytes
(
bytes
(
encryptedData
,
encoding
=
'utf8'
))
iv
=
base64
.
decodebytes
(
bytes
(
iv
,
encoding
=
'utf8'
))
cipher
=
AES
.
new
(
sessionKey
,
AES
.
MODE_CBC
,
iv
)
des_str
=
cipher
.
decrypt
(
encryptedData
)
print
(
"=================================="
)
print
(
des_str
)
des_str
=
self
.
_unpad
(
des_str
)
print
(
des_str
)
des_str
=
str
(
des_str
,
encoding
=
'utf-8'
)
decrypted
=
json
.
loads
(
des_str
)
if
decrypted
[
'watermark'
][
'appid'
]
!=
self
.
appId
:
raise
Exception
(
'Invalid Buffer'
)
return
decrypted
def
_unpad
(
self
,
s
):
return
s
[:
-
ord
(
s
[
len
(
s
)
-
1
:])]
if
__name__
==
'__main__'
:
appId
=
'wx3185fb4a3633beb0'
sessionKey
=
'S7CMDfC6jXJKSaWKanG8oQ=='
encryptedData
=
'E7LZhvK7mOcaYsv9xcAfsBN9eSbzFh9FyMtFJ0zsFB0M62zRJ0cosZWksUujUR5WYUmNoIfIJnTIF8gRskxxbFU3fm5X7z4ChZecMSaFM65aEK1suRUD1U0ubB7mOwBBlY4ftdPT5kRwWgXKVkM4VAkYGN8A4fjWE93yGtjzxXs9dypQkCLSNWs6Kw5USEzjhtDZnptVy+lHF5fTXRuzoCstW2Cto4YI3G9hmnS64QuWjRteSqIgh8GN1zEPN0dROJjaWBjqraBCt/BfMsk4HBeL4PA75K8WdqVgKGfQ7/rnmPFOsNXWfajx9jl7XcrfoPaaPL1DmIJ1BlQne2GuLFtzZ3O4/8cdVQ9Lb0N/3kFAzjgzNFNLSYj2VNctmWyLdWi8hH90yslvrODIhMzIsuux2GIAfp0rQd/iVIVvtd7PXBOCe5iZ7aaqD0b0mLF4CmsuBpl8Eh20ZHkYw2SqO0x9uFrS/gy1vwtkmsTpcDw='
iv
=
'DQcmcXyQkU+VKqb2mKmasQ=='
pc
=
WXBizDataCrypt
(
appId
,
sessionKey
)
pc
.
decrypt
(
encryptedData
,
iv
)
#
\ No newline at end of file
...
...
utils/wechat/WXBizMsgCrypt.py
View file @
e48d50f4
This diff is collapsed.
Click to expand it.
utils/wechat/__init__.py
View file @
e48d50f4
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
\ No newline at end of file
#!usr/bin/.env python # -*- coding:utf-8 _*- """ @version: author:Aeolus @file: __init__.py.py """
\ No newline at end of file
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment