Commit 5c17dea5 by Aeolus

update

parent 9e71eecf
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
config/.env/
venv/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
__pycache__/
.idea/
*.pyc
.gz
.txt
*.zip
.log*
*.env
*.rar
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Cert
*.pem
*.pfx
*.key
*.crt
*.cer
*.truststore
# IDE
*.DS_Store
# Media
*.jpg
#Document
*.xls
env_path_config.py
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: app_config.py
@function:
@modify:
"""
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY')
SQLALCHEMY_DATABASE_URI = os.environ.get("SQLALCHEMY_DATABASE_URI")
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_POOL_SIZE = 10
SQLALCHEMY_POOL_RECYCLE = 1800
JWT_SECRET = SECRET_KEY
TENCENT_REDIS_URL = os.getenv("TENCENT_REDIS_URL")
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
class ProductionConfig(Config):
DEBUG = False
config = {
'dev': DevelopmentConfig,
'prod': ProductionConfig,
'default': DevelopmentConfig
}
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: base_config.py
"""
import os
SECRET_KEY = os.getenv('SECRET_KEY')
SQLALCHEMY_DATABASE_URI = os.getenv("SQLALCHEMY_DATABASE_URI")
MONGO_DATABASE_URI = os.getenv("MONGO_DATABASE_URI")
TENCENT_REDIS_URL = os.getenv("TENCENT_REDIS_URL")
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: commen_config.py
"""
import os
SMS_CONFIG = {
'app_id': os.getenv("SMS_APP_ID"),
'app_key': os.getenv("SMS_APP_KEY")
}
EMAIL_CONFIG = {
'KEFU_EMAIL': os.getenv("EMAIL_ACCOUNT"),
'PASSWORD': os.getenv("EMAIL_PASSWORD"),
'SMTP_SERVER': os.getenv("EMAIL_SMTP_SERVER"),
'KEFU_NAME': '晓兔',
'SUBJECT': '电子发票已开具'
}
IMG_HEAD_URL = "https://dev-1255927177.cos.ap-shanghai.myqcloud.com"
IMG_DOMAIN_URL = "https://dev-1255927177.file.myqcloud.com"
# 微信session_key
WX_SESSION_KEY = "W_S_K_"
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: wechat_config.py
"""
import os
SK_CONFIG = {
"app_id": os.getenv("SK_MINI_PROGRAM_APPID"),
"app_secret": os.getenv("SK_MINI_PROGRAM_APPSECRET"),
}
SSW_PAY_CONFIG = {
"mch_id": os.getenv("SSW_MCHID"),
"pay_key": os.getenv("SSW_PAY_KEY"),
"cert_path": os.getenv("SSW_PAY_SSL_CERT_PATH"),
"key_path": os.getenv("SSW_PAY_SSL_KEY_PATH"),
'callback_url': '/rent/wx_pay_callback',
'refund_callback_url': '/rent/refund_callback',
'domain': 'https://guide.ssw-htzn.com/v2/tour/'
}
XX_PAY_CONFIG = {
"mch_id": os.getenv("SSW_MCHID"),
"pay_key": os.getenv("SSW_PAY_KEY"),
"cert_path": os.getenv("SSW_PAY_SSL_CERT_PATH"),
"key_path": os.getenv("SSW_PAY_SSL_KEY_PATH"),
'callback_url': '/rent/wx_pay_callback',
'refund_callback_url': '/rent/refund_callback',
'domain': 'https://guide.ssw-htzn.com/v2/tour/'
}
pay_config_list = ["", "ssw", "xx"]
pay_config_dict = {
"ssw": SSW_PAY_CONFIG,
"xx": XX_PAY_CONFIG,
}
platform_config_list = ["", "sukang24h", ]
platform_config_dict = {
"sukang24h": SK_CONFIG,
}
platform_appid_config_list = [
"",
SK_CONFIG["app_id"], # 苏康24H 平台序号 ==>1
]
platform_appid_config_dict = {
"sukang24h": SK_CONFIG["app_id"],
}
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
\ No newline at end of file
# -*- coding: utf-8 -*-
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, text
from sqlalchemy.dialects.mysql import INTEGER, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
db = SQLAlchemy(session_options={"autoflush": False})
class Base(db.Model):
__abstract__ = True
id = Column(INTEGER, primary_key=True)
created_at = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
@author:Aeolus
@file: __init__.py
@function:
@modify:
"""
from flask import Flask
from flask_cors import CORS
from flask_log_request_id import RequestID
from dotenv import load_dotenv
from models.base_model import db
from utils.my_redis_cache import redis_client
from utils.mylogger import set_logger
def create_app(config_name):
from config.env_path_config import env_path
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
set_logger()
app = Flask("sukang24h")
from config.app_config import config
app.config.from_object(config[config_name])
CORS(app)
db.init_app(app)
redis_client.init_app(app)
RequestID(app)
from utils.middlewares import jwt_authentication, log_enter_interface, log_out_interface, close_db_session, \
get_platform, all_options_pass
app.before_request(log_enter_interface)
app.before_request(all_options_pass)
app.before_request(get_platform)
app.before_request(jwt_authentication)
app.after_request(log_out_interface)
app.after_request(close_db_session)
# todo register blueprint
from myapps.sukang24h.api import register_sukang_blueprint
register_sukang_blueprint(app)
return app
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
from flask import Flask
from myapps.sukang24h.api.wx_auth import wx_auth_route
def register_sukang_blueprint(app: Flask):
prefix = "/sukang"
app.register_blueprint(wx_auth_route, url_prefix=prefix + "/wx_auth")
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: wx_auth.py
"""
import logging
import time
from flask import Blueprint, request, g, jsonify
from wechatpy.crypto import WeChatWxaCrypto
from config.commen_config import WX_SESSION_KEY
from config.wechat_config import platform_appid_config_dict, platform_appid_config_list
from models.base_model import db
from models.models import WxUser
from service.wechat_service import WxMPService
from utils.error_code import WX_LOGIN_DATA_ERROR, WX_LOGIN_CODE_ERROR, PHONE_NOT_BINDING_ERROR, TOKEN_EXPIRE_ERROR
from utils.jwt_util import generate_jwt
from utils.my_redis_cache import redis_client
from utils.my_response import BaseResponse
logger = logging.getLogger(__name__)
wx_auth_route = Blueprint('wx_auth', __name__)
@wx_auth_route.route('/test', methods=['POST'])
def my_test():
a = {'error_code': '0', 'error_message': 'Success'}
return jsonify(a)
@wx_auth_route.route('/login_wx_mp', methods=['POST'])
def mini_login():
"""
小程序登陆接口
:return:
"""
req = request.get_json()
platform = req.get('platform')
code = req.get('code')
encrypted_data = req.get('encryptedData')
iv = req.get('iv')
if not code or not encrypted_data or not iv or not platform:
return jsonify(WX_LOGIN_DATA_ERROR)
app_id = platform_appid_config_dict.get(platform)
logger.info(platform_appid_config_dict)
wx_client = WxMPService(platform)
user_session = wx_client.code_to_session(code)
logger.info(user_session)
if user_session is None:
logger.info(WX_LOGIN_CODE_ERROR)
return jsonify(WX_LOGIN_CODE_ERROR)
try:
user_info = WeChatWxaCrypto(user_session["session_key"], iv, app_id).decrypt_message(encrypted_data)
except:
try:
session_key = redis_client.get(WX_SESSION_KEY + user_session["openid"])
user_info = WeChatWxaCrypto(session_key, iv, app_id).decrypt_message(encrypted_data)
except:
return BaseResponse(error_code=500, error_message='user info decrypt error')
is_new_user = 0
wx_user_model = WxUser.query.filter_by(openid=user_session["openid"]).first()
if not wx_user_model:
wx_user_model = WxUser()
wx_user_model.status = 1
is_new_user = 1
wx_user_model.mini_program_open_id = user_session["openid"]
if user_session.get("unionid", None):
wx_user_model.unionid = user_session["unionid"]
wx_user_model.nick_name = user_info["nickName"]
wx_user_model.gender = user_info["gender"]
wx_user_model.language = user_info["language"]
wx_user_model.avatar_url = user_info["avatarUrl"]
wx_user_model.city = user_info["city"]
wx_user_model.province = user_info["province"]
wx_user_model.country = user_info["country"]
# wx_user_model.platform = 1
wx_user_model.platform = platform_appid_config_list.index(app_id)
db.session.add(wx_user_model)
db.session.commit()
token = generate_jwt(payload={"user_id": wx_user_model.id}, expiry=time.time() + 24 * 60 * 60)
return jsonify({
'customer_id': wx_user_model.id,
'token': token,
'error_code': 0,
"is_new_user": is_new_user
})
@wx_auth_route.route('/checkPhone', methods=["POST"])
def check_phone():
"""
验证手机号是否登陆过接口
:return:
"""
if not g.user.phone:
return jsonify(PHONE_NOT_BINDING_ERROR)
return BaseResponse(data={'phone_number': g.user.phone})
@wx_auth_route.route('/bindPhone', methods=['POST'])
def bind_phone():
'''
绑定手机号接口
:return:
'''
req = request.get_json()
encrypted_data = req.get('encryptedData', '')
iv = req.get('iv', '')
if not encrypted_data or not iv:
logger.info(WX_LOGIN_DATA_ERROR)
return BaseResponse(**WX_LOGIN_DATA_ERROR)
user_info = WxUser.query.filter_by(id=g.user.id).first()
if not user_info:
return BaseResponse(**WX_LOGIN_DATA_ERROR)
try:
session_key = redis_client.get(WX_SESSION_KEY + user_info.mini_program_open_id)
session_key = str(session_key, encoding='utf-8')
except Exception:
session_key = None
if not session_key:
return jsonify(TOKEN_EXPIRE_ERROR)
try:
app_id = platform_appid_config_list[g.user.platform]
phone_info = WeChatWxaCrypto(session_key, iv, app_id).decrypt_message(
encrypted_data)
except Exception as e:
logger.error(e)
return BaseResponse(error_code=500, error_message='phone info decrypt error')
user_info.phone = phone_info['phoneNumber']
db.session.add(user_info)
db.session.commit()
return BaseResponse()
This diff is collapsed. Click to expand it.
[tool.poetry]
name = "sukang24h"
version = "0.1.0"
description = ""
authors = ["冯佳佳 <478124129@qq.com>"]
[tool.poetry.dependencies]
python = "^3.7"
Flask = "^2.0.2"
Flask-Cors = "^3.0.10"
Flask-Log-Request-ID = "^0.10.1"
python-dotenv = "^0.19.0"
SQLAlchemy = "^1.4.25"
Flask-SQLAlchemy = "^2.5.1"
PyMySQL = "^1.0.2"
wechatpy = "^1.8.15"
flask-redis = "^0.4.0"
PyJWT = "^2.2.0"
pycryptodome = "^3.11.0"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
# -*- coding: utf-8 -*-
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.utils import formataddr
from config.commen_config import EMAIL_CONFIG
class EmailService:
def __init__(self):
self.from_addr = EMAIL_CONFIG['KEFU_EMAIL']
self.password = EMAIL_CONFIG['PASSWORD']
self.smtp_server = EMAIL_CONFIG['SMTP_SERVER']
def send_email(self, to_addr, rent, fp_dm, fp_hm, pdf_url):
content = '''
<html><body><b>尊敬的用户您好:</b> <br><p>感谢您使用灰晓兔!</p>
<p>灰晓兔已经为您开具订单{rent}的电子普通发票,发票数量共计1张,如下:</p>
<p>发票代码:{fp_dm},发票号码:{fp_hm},您可以点击“<a href="{pdf_url}">电子普通发票下载</a>”获取该发票文件;</p>
<p>电子普通发票是税务机关人口的有效收付款凭证,与纸质发票具有同等法律效力,可用于报销入账等。</p>
<p>如果您有任何疑问或建议,请联系18068402080。</p>
'''.format(rent=rent, fp_dm=fp_dm, fp_hm=fp_hm, pdf_url=pdf_url)
msg = MIMEText(content, 'html', 'utf-8')
lam_format_addr = lambda name, addr: formataddr((Header(name, 'utf-8').encode(), addr))
msg['From'] = lam_format_addr(EMAIL_CONFIG['KEFU_NAME'], self.from_addr)
msg['To'] = lam_format_addr(to_addr, to_addr)
msg['Subject'] = Header(EMAIL_CONFIG['SUBJECT'], 'utf-8').encode()
server = smtplib.SMTP_SSL(self.smtp_server, 465)
server.login(self.from_addr, self.password)
server.sendmail(self.from_addr, [to_addr], msg.as_string())
server.quit()
# -*- coding: utf-8 -*-
import datetime
import hashlib
import random
import time
from sqlalchemy.exc import SQLAlchemyError
from utils.Helper.Helper import get_format_date
from models.base_model import db
from models.feedback_models import Feedback, FeedbackImage
class FeedbackService():
@staticmethod
def getReplay(customer_id):
sql = '''
select feedbacks.id,feedbacks.remark, feedbacks.`status`, feedbacks.reply, feedbacks.created_at, feedback_images.url from feedbacks
right join feedback_images on feedback_images.feedback_id = feedbacks.id where feedbacks.customer_id = '{customer_id}'
'''.format(customer_id=customer_id)
info = db.session.execute(sql)
return info
@staticmethod
def create(customer_id, remark, telephone, pics):
feedback = FeedbackService.createFeedbackStub(customer_id, remark, telephone)
if feedback:
try:
db.session.add(feedback)
db.session.commit()
except SQLAlchemyError as e:
raise e
if len(pics) > 0:
for item in pics:
feedback_imgs = FeedbackService.createFeedbackImage(feedback.id, item)
if feedback_imgs:
try:
db.session.add(feedback_imgs)
db.session.commit()
except SQLAlchemyError as e:
raise e
@staticmethod
def createFeedbackStub(customer_id, remark, telephone):
feedback = Feedback()
feedback.customer_id = customer_id
feedback.remark = remark
feedback.telephone = telephone
feedback.status = 0
feedback.created_at = datetime.datetime.now()
feedback.updated_at = datetime.datetime.now()
return feedback
@staticmethod
def createFeedbackImage(feedback_id, pic):
feedbackImage = FeedbackImage()
feedbackImage.url = pic
feedbackImage.feedback_id = feedback_id
feedbackImage.created_at = datetime.datetime.now()
feedbackImage.updated_at = datetime.datetime.now()
return feedbackImage
@staticmethod
def guid():
timestamp = int(time.time())
ranstr = random.randint(9999, 9999999999)
return FeedbackService.MD5(str(timestamp) + str(ranstr)) + FeedbackService.MD5(str(ranstr))[0:8]
@staticmethod
def formReplayInfo(data):
da = []
for item in data:
cur_data = {}
cur_data['id'] = item.id
cur_data['created_at'] = get_format_date(item.created_at)
cur_data['remark'] = item.remark
cur_data['status'] = '未处理' if item.status == 0 else '已处理'
cur_data['reply'] = '暂无回复' if item.reply is None else item.reply
urls = []
urls.append(item.url)
cur_data['urls'] = urls
da.append(cur_data)
return da
@staticmethod
def MD5(info):
m = hashlib.md5()
m.update(info.encode("utf-8"))
return m.hexdigest()
# -*- coding: utf-8 -*-
import random
import string
from qcloudsms_py import SmsSingleSender, SmsMultiSender
from qcloudsms_py.httpclient import HTTPError
from config.commen_config import SMS_CONFIG
from utils.my_redis_cache import redis_client
class SMSService():
def __init__(self):
self.appid = SMS_CONFIG['app_id']
self.appKey = SMS_CONFIG['app_key']
try:
self.ssender = SmsSingleSender(self.appid, self.appKey)
self.msender = SmsMultiSender(self.appid, self.appKey)
except Exception as e:
print(e)
def create_code(self, length=4):
'''
生成验证码
:param length:
:return:
'''
verification = []
letters = string.digits
for i in range(length):
letter = letters[random.randint(0, 9)]
verification.append(letter)
return "".join(verification)
def make_code(self, phoneNumber):
'''
设置验证码300秒有效
:param phoneNumber:
:return:
'''
code = self.create_code(4)
redis_client.set('V_C' + str(phoneNumber), code, 300)
return code
def phoneSendCode(self, phoneNumber, tempId, sign='灰兔智能'):
'''
发送验证码
:param phoneNumber:
:param tempId:
:param sign:
:return:
'''
try:
result = self.ssender.send(0, 86, phoneNumber, tempId, sign=sign, extend="", ext='')
return result
except HTTPError as e:
print(e)
except Exception as e:
print(e)
return
def phoneSendCode(self, phoneNumber, tempId, sign='灰兔智能'):
'''
发送验证码
:param phoneNumber:
:param tempId:
:param sign:
:return:
'''
code = self.make_code(phoneNumber)
params = [code]
try:
result = self.ssender.send_with_param(86, phoneNumber, tempId, params, sign=sign, extend="", ext='')
return result
except HTTPError as e:
print(e)
except Exception as e:
print(e)
return
def phoneSendCodeWithContent(self, phoneNumber, tempId, Content, sign='灰兔智能'):
'''
发送验证码
:param phoneNumber:
:param tempId:
:param sign:
:return:
'''
try:
if isinstance(phoneNumber, list):
result = self.msender.send_with_param(86, phoneNumber, tempId, Content, sign=sign, extend="", ext='')
else:
result = self.ssender.send_with_param(86, phoneNumber, tempId, Content, sign=sign, extend="", ext='')
return result
except HTTPError as e:
print(e)
except Exception as e:
print(e)
return
def verificate(self, phoneNumber, code):
'''
判断验证码,-1验证码过期 -2验证码错误 0匹配
:param phoneNumber:
:param code:
:return:
'''
ver = redis_client.get('V_C' + str(phoneNumber))
if ver is None:
return -1
else:
if ver == str(code).encode('utf-8'):
return 0
else:
return -2
def phoneSendTips(self, user_name, spot_name, mac_no, num, from_phone, to_phone, sign='灰兔智能'):
params = [user_name, spot_name, mac_no, num, from_phone]
try:
result = self.ssender.send_with_param(86, to_phone, 766214, params, sign=sign, extend="", ext='')
return result
except HTTPError as e:
print(e)
except Exception as e:
print(e)
return
\ No newline at end of file
# -*- coding: utf-8 -*-
import hashlib
import json
import random
import string
import requests
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
from utils.jwt_util import generate_jwt, verify_jwt
from models.user_models import CustomerModel as User
class UserService():
@staticmethod
def geneSalt(length=16):
keylist = [random.choice((string.ascii_letters + string.digits)) for i in range(length)]
return ("".join(keylist))
@staticmethod
def generate_auth_token(id, expiration=604800):
"""
生成用户接口权限Token
:param id: 用户ID
:param expiration: 过期时间,默认用7*24*60*60
:return:
"""
result = generate_jwt(payload={"user_id": id}, expiry=expiration)
return result
@staticmethod
def checkAuthCode(token=None):
auth_info = token.split("#")
if len(auth_info) != 2:
return False
try:
user_info = User.query.filter_by(id=auth_info[1]).first()
except Exception:
return False
if user_info is None:
return False
if auth_info[0] != UserService.geneAuthCode(user_info):
return False
if user_info.status != 1:
return False
return user_info
@staticmethod
def verify_auth_token(token):
result = verify_jwt(token)
return result
if __name__ == '__main__':
print(UserService().generate_auth_token(21, 60000000))
# -*- coding: utf-8 -*-
import xml.etree.ElementTree as ET
class UtilService(object):
@staticmethod
def dict_to_xml(dict_data):
'''
dict to xml
:param dict_data:
:return:
'''
xml = ["<xml>"]
for k, v in dict_data.items():
xml.append("<{0}>{1}</{0}>".format(k, v))
xml.append("</xml>")
return "".join(xml)
@staticmethod
def xml_to_dict(xml_data):
'''
xml to dict
:param xml_data:
:return:
'''
xml_dict = {}
root = ET.fromstring(xml_data)
for child in root:
xml_dict[child.tag] = child.text
return xml_dict
# -*- coding: utf-8 -*-
import datetime
import hashlib, requests, uuid, json
import xml.etree.ElementTree as ET
from requests.exceptions import HTTPError
from wechatpy import WeChatClient
from wechatpy.client.api import WeChatWxa
from wechatpy.pay import WeChatPay, calculate_signature
import logging
from wechatpy.session.redisstorage import RedisStorage
from config.commen_config import WX_SESSION_KEY
from config.wechat_config import pay_config_dict, platform_config_dict
from utils.my_redis_cache import redis_client
logger = logging.getLogger(__name__)
class WeChatPayService(WeChatPay):
"""
pass
"""
def __init__(self, app_id, config_name):
self.app_id = app_id
self.config = pay_config_dict[config_name]
mch_id = self.config["mch_id"]
pay_key = self.config["pay_key"]
cert_path = self.config["cert_path"]
key_path = self.config["key_path"]
super(WeChatPayService, self).__init__(app_id, pay_key, mch_id, mch_cert=cert_path, mch_key=key_path)
def unifiedorder(self, pay_data):
"""
:param pay_data:
:return:
"""
notify_url = self.config["domain"] + self.config["callback_url"]
logger.debug(notify_url)
pay_data["nonce_str"] = self.get_nonce_str()
result = self.order.create(notify_url=notify_url, **pay_data)
prepay_id = result.get('prepay_id')
pay_sign_data = {
'appId': self.app_id,
'timeStamp': pay_data.get('timeStamp'),
'nonceStr': pay_data.get('nonce_str'),
'package': 'prepay_id={0}'.format(prepay_id),
'signType': 'MD5',
}
pay_sign = calculate_signature(pay_sign_data, self.config["pay_key"])
pay_sign_data.pop('appId')
pay_sign_data['paySign'] = pay_sign
pay_sign_data['prepay_id'] = prepay_id
pay_sign_data['out_trade_no'] = pay_data.get('out_trade_no')
# TODO 加上时间,金额,个数
pay_sign_data['rent_time'] = datetime.datetime.fromtimestamp(int(pay_data.get('timeStamp'))).strftime(
'%Y-%m-%d %H:%M:%S')
pay_sign_data['total_fee'] = pay_data["total_fee"]
pay_sign_data["number"] = pay_data["attach"]["number"]
return pay_sign_data
def do_refund(self, refund_data):
"""
:param refund_data:
:return:
"""
for i in range(3):
try:
result = self.refund.apply(**refund_data)
return_code = result['return_code']
result_code = result.get('result_code')
if return_code != 'SUCCESS' or result_code != 'SUCCESS':
continue
else:
if result.get("err_code", None):
continue
return result
except Exception as e:
logger.info(e)
continue
@staticmethod
def get_nonce_str():
'''
获取随机字符串
:return:
'''
return str(uuid.uuid4()).replace('-', '')
def create_sign(self, pay_data, pay_key):
'''
生成签名
:return:
'''
stringA = '&'.join(["{0}={1}".format(k, pay_data.get(k)) for k in sorted(pay_data)])
stringSignTemp = '{0}&key={1}'.format(stringA, pay_key)
logger.info(stringSignTemp)
sign = hashlib.md5(stringSignTemp.encode("utf-8")).hexdigest()
return sign.upper()
def verify_pay_callbak_sign(self, callback_data):
"""
验证签名方法
:param callback_data: 微信回调的xml解析后的数据
:return:
"""
if callback_data['result_code'] != 'SUCCESS':
return False
sign = callback_data['sign']
callback_data.pop('sign')
gene_sign = calculate_signature(callback_data, self.config["pay_key"])
if sign != gene_sign:
return False
return True
def query_refund_info(self, refund_data):
result = self.refund.query(**refund_data)
return result
class WxMPService(WeChatClient):
def __init__(self, config_name):
config = platform_config_dict[config_name]
app_id = config["app_id"]
app_secret = config["app_secret"]
session_interface = RedisStorage(
redis_client,
prefix="wx"
)
super(WxMPService, self).__init__(app_id, app_secret, session=session_interface)
def code_to_session(self, code):
res = self.wxa.code_to_session(code)
open_id = res["openid"]
session_key = res["session_key"]
try:
old_session_key = redis_client.get(WX_SESSION_KEY + open_id)
except:
old_session_key = None
if old_session_key:
res["session_key"] = str(old_session_key, encoding='utf-8')
try:
redis_client.set(WX_SESSION_KEY + open_id, session_key)
except Exception as e:
raise e
return res
if __name__ == '__main__':
target_wechat = WeChatPayService()
# -*- coding: utf-8 -*-
from utils.Helper.Helper import get_format_date
from models.guide_record_models import GuideRecordModel
from models.spot_models import Spot
class WeGuideService():
@staticmethod
def getMyRecord(customer_id, page, limit):
record_list = \
GuideRecordModel.query.filter(GuideRecordModel.customer_id == customer_id and GuideRecordModel.is_pay == 1).order_by(
GuideRecordModel.pay_time.desc()).all()[page - 1: limit]
return record_list
@staticmethod
def formatWeGuideRecordModel(weguide_info):
data = []
for item in weguide_info:
cur_data = {}
cur_data['address'] = Spot.query.filter_by(id=item.spot_id).first().spotname
cur_data['guide_no'] = item.guide_no
cur_data['total'] = item.total
cur_data['real_total'] = item.real_total
cur_data['pay_time'] = get_format_date(item.pay_time)
data.append(cur_data)
return data
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
@author:Aeolus
"""
import os
import logging
from myapps.sukang24h import create_app
logger = logging.getLogger(__name__)
app = create_app(os.getenv('RUN_ENV', 'prod'))
logger.info("run server")
if __name__ == '__main__':
app.run('127.0.0.1', 8893, debug=True)
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: error_code.py
"""
# BASE_RESPONSE = {
# "error_code": "0",
# "error_message": "Success"
# }
# 用户相关 10开头
TOKEN_NOT_VALID_ERROR = {
"error_code": "1001",
"error_message": "无效的token"
}
TOKEN_NOT_PROVIDED_ERROR = {
"error_code": "1002",
"error_message": "token未提供"
}
TOKEN_EXPIRE_ERROR = {
"error_code": "1003",
"error_message": "token超时"
}
PHONE_NOT_BINDING_ERROR = {
"error_code": "1004",
"error_message": "未绑定手机号"
}
PHONE_NOT_NULL_ERROR = {
"error_code": "1005",
"error_message": "手机号为空"
}
PHONE_NOT_VALID_ERROR = {
"error_code": "1006",
"error_message": "无效的手机号"
}
USER_ALREADY_REGISTER_ERROR = {
"error_code": "1007",
"error_message": "用户已注册"
}
VERIFICATION_CODE_NULL_ERROR = {
"error_code": "1008",
"error_message": "验证码为空"
}
VERIFICATION_CODE_INVALID_ERROR = {
"error_code": "1009",
"error_message": "验证码已失效"
}
VERIFICATION_CODE_ERROR = {
"error_code": "1010",
"error_message": "验证码错误"
}
PASSWORD_ERROR = {
"error_code": "1011",
"error_message": "账号或密码错误"
}
## 微信登陆相关
WX_LOGIN_DATA_ERROR = {
"error_code": 3001,
"error_message": "微信登录数据错误"
}
WX_LOGIN_CODE_ERROR = {
"error_code": 3002,
"error_message": "微信登录code值错误"
}
WX_OPENID_NOT_GET_ERROR = {
"error_code": 3003,
"error_message": "微信OpenId获取失败,请刷新重试"
}
### 微信支付相关
WE_MINIAPP_PAY_FAIL = {
"error_code": 3101,
"error_message": "小程序下单失败"
}
### 消息推送相关
WXBizMsgCrypt_OK = {
"error_code": 0,
"error_message": "WXBizMsgCrypt_OK"
}
WXBizMsgCrypt_ValidateSignature_Error = {
"error_code": 4001,
"error_message": "验证签名错误"
}
WXBizMsgCrypt_ParseXml_Error = {
"error_code": 4002,
"error_message": "解析xml错误"
}
WXBizMsgCrypt_ComputeSignature_Error = {
"error_code": 4003,
"error_message": "计算签名错误"
}
WXBizMsgCrypt_IllegalAesKey = {
"error_code": 4004,
"error_message": "Aes key非法错误"
}
WXBizMsgCrypt_ValidateAppid_Error = {
"error_code": 4005,
"error_message": "appid错误"
}
WXBizMsgCrypt_EncryptAES_Error = {
"error_code": 4006,
"error_message": "aes加密错误"
}
WXBizMsgCrypt_DecryptAES_Error = {
"error_code": 4007,
"error_message": "aes解密错误"
}
WXBizMsgCrypt_IllegalBuffer = {
"error_code": 4008,
"error_message": "illegal buffer"
}
WXBizMsgCrypt_EncodeBase64_Error = {
"error_code": 4009,
"error_message": "base64加密错误"
}
WXBizMsgCrypt_DecodeBase64_Error = {
"error_code": 4010,
"error_message": "base64解密错误"
}
WXBizMsgCrypt_GenReturnXml_Error = {
"error_code": 4011,
"error_message": "gen return xml error"
}
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/30
@file: jwt_util.py
@function:
@modify:
"""
import jwt
from flask import current_app
def generate_jwt(payload, expiry, secret=None):
"""
生成jwt
:param payload: dict 载荷
:param expiry: datetime 有效期
:param secret: 密钥
:return: jwt
"""
_payload = {'exp': expiry}
_payload.update(payload)
if not secret:
secret = current_app.config['SECRET_KEY']
token = jwt.encode(_payload, secret, algorithm='HS256')
return token
def verify_jwt(token, secret=None):
"""
检验jwt
:param token: jwt
:param secret: 密钥
:return: dict: payload
"""
if not secret:
secret = current_app.config['SECRET_KEY']
try:
payload = jwt.decode(token, secret, algorithms=['HS256'])
except jwt.PyJWTError:
payload = None
return payload
if __name__ == '__main__':
import time
from config.env_path_config import env_path
from dotenv import load_dotenv
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
import os
SECRET_KEY = os.getenv('SECRET_KEY')
token = generate_jwt({"user_id": 1}, time.time() + 6000, SECRET_KEY)
print(token)
# for i in range(10):
# result = verify_jwt(token, 'secret')
# print(result)
# print(time.time())
# time.sleep(1)
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: middlewares.py
@function:
@modify:
"""
import logging
from flask import g, request, url_for, current_app, make_response, jsonify
from config.wechat_config import platform_config_list
from models.models import WxUser
from utils.error_code import TOKEN_NOT_VALID_ERROR
from utils.my_response import BaseResponse
from utils.jwt_util import verify_jwt
logger = logging.getLogger(__name__)
def log_enter_interface():
"""
日志打印进入接口
:return:
"""
logger.info("######################### 进入 {} 接口 ################################ ".format(request.path))
def log_out_interface(environ):
"""
日志打印退出接口
:return:
"""
logger.info("######################### 退出 {} 接口 ################################\n".format(request.path))
return environ
def close_db_session(environ):
from models.base_model import db
db.session.close()
return environ
"""用户认证机制==>每次请求前获取并校验token"""
"@myapps.before_request 不使@调用装饰器 在 init文件直接装饰"
def jwt_authentication():
"""
1.获取请求头Authorization中的token
2.判断是否以 Bearer开头
3.使用jwt模块进行校验
4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存
"""
if current_app.name == "sukang24h":
NO_AUTH_CHECK_URL = [url_for('wx_auth.my_test'),
url_for('wx_auth.mini_login'),
# url_for('rent.wx_pay_callback'),
]
else:
NO_AUTH_CHECK_URL = []
if request.path not in NO_AUTH_CHECK_URL:
token = request.headers.get('Authorization')
# "校验token"
payload = verify_jwt(token)
# "判断token的校验结果"
if payload:
# "获取载荷中的信息赋值给g对象"
user_id = payload.get('user_id')
if not user_id:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
g.user = WxUser.query.filter_by(id=user_id).first()
else:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
def get_platform():
"""
:return:
"""
g.platform = request.headers.get('platform', "sukang24h")
def all_options_pass():
"""
:return:
"""
if request.method == "OPTIONS":
headers = {'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Headers':
'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform',
}
return make_response((jsonify({'error_code': 0}), 200, headers))
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: my_redis_cache.py
"""
from flask_redis import FlaskRedis
redis_client = FlaskRedis(config_prefix='TENCENT_REDIS')
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: my_response.py
@function:
@modify:
"""
from flask import Response
from flask.json import dumps
class BaseResponse(Response):
def __init__(self, data=None, error_code=0, error_message='Success', *args, **kwargs):
if data is not None:
result = dumps(dict(data=data, error_code=error_code, error_message=error_message, *args, **kwargs))
else:
result = dumps(dict(error_code=error_code, error_message=error_message, *args, **kwargs))
Response.__init__(self, result, mimetype='application/json')
import logging
from flask_log_request_id import RequestIDLogFilter
def set_logger():
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(filename)s - %(funcName)s -[line:%(lineno)d] - %(levelname)s - request_id=%(request_id)s: %(message)s",
"%Y-%m-%d %H:%M:%S"))
console_handler.addFilter(RequestIDLogFilter())
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().addHandler(console_handler)
import base64
import json
from Crypto.Cipher import AES
class WXBizDataCrypt:
def __init__(self, appId, sessionKey):
self.appId = appId
self.sessionKey = sessionKey
def decrypt(self, encryptedData, iv):
sessionKey = base64.decodebytes(bytes(self.sessionKey, encoding='utf8'))
encryptedData = base64.decodebytes(bytes(encryptedData, encoding='utf8'))
iv = base64.decodebytes(bytes(iv, encoding='utf8'))
cipher = AES.new(sessionKey, AES.MODE_CBC, iv)
des_str = cipher.decrypt(encryptedData)
print("==================================")
print(des_str)
des_str = self._unpad(des_str)
print(des_str)
des_str = str(des_str,encoding='utf-8')
decrypted = json.loads(des_str)
if decrypted['watermark']['appid'] != self.appId:
raise Exception('Invalid Buffer')
return decrypted
def _unpad(self, s):
return s[:-ord(s[len(s) - 1:])]
if __name__ == '__main__':
appId = 'wx3185fb4a3633beb0'
sessionKey='S7CMDfC6jXJKSaWKanG8oQ=='
encryptedData='E7LZhvK7mOcaYsv9xcAfsBN9eSbzFh9FyMtFJ0zsFB0M62zRJ0cosZWksUujUR5WYUmNoIfIJnTIF8gRskxxbFU3fm5X7z4ChZecMSaFM65aEK1suRUD1U0ubB7mOwBBlY4ftdPT5kRwWgXKVkM4VAkYGN8A4fjWE93yGtjzxXs9dypQkCLSNWs6Kw5USEzjhtDZnptVy+lHF5fTXRuzoCstW2Cto4YI3G9hmnS64QuWjRteSqIgh8GN1zEPN0dROJjaWBjqraBCt/BfMsk4HBeL4PA75K8WdqVgKGfQ7/rnmPFOsNXWfajx9jl7XcrfoPaaPL1DmIJ1BlQne2GuLFtzZ3O4/8cdVQ9Lb0N/3kFAzjgzNFNLSYj2VNctmWyLdWi8hH90yslvrODIhMzIsuux2GIAfp0rQd/iVIVvtd7PXBOCe5iZ7aaqD0b0mLF4CmsuBpl8Eh20ZHkYw2SqO0x9uFrS/gy1vwtkmsTpcDw='
iv = 'DQcmcXyQkU+VKqb2mKmasQ=='
pc = WXBizDataCrypt(appId, sessionKey)
pc.decrypt(encryptedData, iv)
#
# -*- coding: utf-8 -*-
import base64
import hashlib
import random
import socket
import string
import struct
import time
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
from utils.error_code.wx_error import WXBizMsgCrypt_OK, WXBizMsgCrypt_ParseXml_Error, WXBizMsgCrypt_EncryptAES_Error, \
WXBizMsgCrypt_DecryptAES_Error, WXBizMsgCrypt_IllegalBuffer, WXBizMsgCrypt_ValidateAppid_Error, \
WXBizMsgCrypt_ValidateSignature_Error
class FormatException(Exception):
pass
def throw_exception(message, exception_class=FormatException):
raise exception_class(message)
class SHA1:
"""计算公众平台的消息签名接口"""
def getSHA1(self, token, timestamp, nonce, encrypt):
""" 用SHA1算法生成安全签名
@param token: 票据
@param timestamp: 时间戳
@param encrypt: 密文
@param nonce: 随机字符串
@return: 安全签名
"""
try:
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
sha = hashlib.sha1()
sha.update(("".join(sortlist)).encode('utf-8'))
return 0, sha.hexdigest()
except Exception as e:
print(e)
return -40003, None
class XMLParse:
"""提供提取消息格式中的密文及生成回复消息格式的接口"""
# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""
def extract(self, xmltext):
""" 提取出xml数据包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
touser_name = xml_tree.find("ToUserName")
return WXBizMsgCrypt_OK['error_code'], encrypt.text, touser_name.text
except Exception as e:
print(e)
return WXBizMsgCrypt_ParseXml_Error['error_code'], None, None
def generate(self, encrypt, signature, timestamp, nonce):
""" 生成xml消息
@param encrypt: 加密后的消息密文
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: 生成的xml字符串
"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
return resp_xml
class PKCS7Encoder():
"""提供基于PKCS7算法的加解密接口"""
block_size = 32
def encode(self, text):
""" 对需要加密的明文进行填充补位
@param text: 需要进行填充补位操作的明文
@return: 补齐明文字符串
"""
text_length = len(text)
# 计算需要填充的位数
amount_to_pad = self.block_size - (text_length % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
# 获得补位所用的字符
pad = chr(amount_to_pad)
return text + pad * amount_to_pad
def decode(self, decrypted):
""" 删除解密后明文的补位字符
@param decrypted: 解密后的明文
@return: 删除补位字符后的明文
"""
pad = ord(decrypted[-1])
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]
class Prpcrypt(object):
"""提供接收和推送给公众平台消息的加解密接口"""
def __init__(self, key):
# self.key = base64.b64decode(key+"=")
self.key = key
# 设置加解密模式为AES的CBC模式
self.mode = AES.MODE_CBC
def encrypt(self, text, appid):
""" 对明文进行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明文开头
text = self.get_random_str() + str(struct.pack("I", socket.htonl(len(text)))) + text + appid
# 使用自定义的填充方式对明文进行补位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64对加密后的字符串进行编码
return WXBizMsgCrypt_OK['error_code'], base64.b64encode(ciphertext)
except Exception as e:
print(e)
return WXBizMsgCrypt_EncryptAES_Error['error_code'], None
def decrypt(self, text, appid):
""" 对解密后的明文进行补位删除
@param text: 密文
@return: 删除填充补位后的明文
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# 使用BASE64对密文进行解码,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
print(e)
return WXBizMsgCrypt_DecryptAES_Error['error_code'], None
try:
pad = ord(plain_text[len(plain_text) - 1:])
# 去掉补位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# 去除16位随机字符串
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
xml_content = str(content[4: xml_len + 4], encoding='utf-8')
from_appid = str(content[xml_len + 4:], encoding='utf-8')
except Exception as e:
print(e)
return WXBizMsgCrypt_IllegalBuffer['error_code'], None
if from_appid != appid:
return WXBizMsgCrypt_ValidateAppid_Error['error_code'], None
return WXBizMsgCrypt_OK['error_code'], xml_content
def get_random_str(self):
""" 随机生成16位字符串
@return: 16位字符串
"""
rule = string.ascii_letters + string.digits
str = random.sample(rule, 16)
return "".join(str)
class WXBizMsgCrypt(object):
# 构造函数
# @param sToken: 公众平台上,开发者设置的Token
# @param sEncodingAESKey: 公众平台上,开发者设置的EncodingAESKey
# @param sAppId: 企业号的AppId
def __init__(self, sToken, sEncodingAESKey, sAppId):
try:
self.key = base64.b64decode(sEncodingAESKey + "=")
assert len(self.key) == 32
except:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
self.token = sToken
self.appid = sAppId
def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
# 将公众号回复用户的消息加密打包
# @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
# @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
# @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
# sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
# return:成功0,sEncryptMsg,失败返回对应的错误码None
pc = Prpcrypt(self.key)
ret, encrypt = pc.encrypt(sReplyMsg, self.appid)
if ret != 0:
return ret, None
if timestamp is None:
timestamp = str(int(time.time()))
# 生成安全签名
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.token, timestamp, sNonce, encrypt)
if ret != 0:
return ret, None
xmlParse = XMLParse()
return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)
def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 检验消息的真实性,并且获取解密后的明文
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sPostData: 密文,对应POST请求的数据
# xml_content: 解密后的原文,当return返回0时有效
# @return: 成功0,失败返回对应的错误码
# 验证安全签名
xmlParse = XMLParse()
encrypt = sPostData["Encrypt"]
touser_name = sPostData["ToUserName"]
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return WXBizMsgCrypt_ValidateSignature_Error['error_code'], None
pc = Prpcrypt(self.key)
ret, xml_content = pc.decrypt(encrypt, self.appid)
return ret, xml_content
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment