Commit b67e0439 by 冯佳佳

nfc-read-write

parent b6e29a4d
......@@ -7,7 +7,8 @@ __pycache__/
# Distribution / packaging
.Python
env/
config/.env/
venv/
build/
develop-eggs/
dist/
......@@ -29,6 +30,8 @@ __pycache__/
.txt
*.zip
.log*
*.env
*.rar
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
......@@ -75,3 +78,8 @@ target/
# Media
*.jpg
#Document
*.xls
env_path_config.py
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: __init__.py.py
@function:
@modify:
"""
import logging
from flask import Flask
from flask_log_request_id import RequestID
from flask_sqlalchemy import SQLAlchemy
from config import config
from mylogger import MyLogger
from utils import jwt_authentication
db = SQLAlchemy()
def create_app(config_name):
print(config_name)
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
RequestID(app)
MyLogger()
# app.before_request(jwt_authentication)
# todo register blueprint
from app.api import api
from app.api.repair import repair
app.register_blueprint(api, url_prefix='/api')
app.register_blueprint(repair, url_prefix='/api/repair')
return app
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/26
@file: __init__.py.py
@function:
@modify:
"""
import logging
from flask import Blueprint, url_for
logger = logging.getLogger(__name__)
api = Blueprint('api', __name__)
@api.route('test', methods=['GET', 'POST'])
def get_test():
logger.info('test')
return 'test'
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: repair.py
@function:
@modify:
"""
import logging
from flask import Blueprint, request
from app import db
from models.Repairs import PowerRepairModel
from utils.return_code import BASE_RESPONSE, PARAMS_INVALID, DUPLICATE_RECORD, SQL_ERROR
logger = logging.getLogger(__name__)
repair = Blueprint('repair', __name__)
@repair.route("/update", methods=["POST"])
def update_repair():
nfc_id = request.values.get("nfc_id", None)
power_sn = request.values.get("sn", None)
repair_content = request.values.get("repair_content", None)
repair_man = request.values.get("repair_man", None)
index = request.values.get("index", None)
if not nfc_id or not power_sn or not repair_content or not repair_man or int(index) > 5:
return BASE_RESPONSE(**PARAMS_INVALID)
logger.info('=======================')
result = PowerRepairModel.query.filter_by(power_sn=power_sn, index=index).first()
if result:
return BASE_RESPONSE(**DUPLICATE_RECORD)
model = PowerRepairModel()
model.power_sn = power_sn
model.nfc_id = nfc_id
model.repair_content = repair_content
model.repair_man = repair_man
model.index = index
try:
db.session.add(model)
db.session.commit()
except Exception as e:
logger.info(e)
return BASE_RESPONSE(**SQL_ERROR)
return BASE_RESPONSE()
@repair.route("/list", methods=["POST"])
def list_repair():
power_sn = request.values.get("sn", None)
if not power_sn:
return BASE_RESPONSE(**PARAMS_INVALID)
logger.info('=======================')
return_data = []
result = PowerRepairModel.query.filter_by(power_sn=power_sn).order_by(PowerRepairModel.index.asc()).all()
if result:
for i in result:
return_data.append({"sn": i.power_sn,
"nfc_id": i.nfc_id,
"repair_content": i.repair_content,
"repair_man": i.repair_man,
"index": i.index,
})
else:
return_data = []
return BASE_RESPONSE(data=return_data)
#!usr/bin/env python
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: app.py.py
@file: app_config.py
@function:
@modify:
"""
import os
from flask import copy_current_request_context
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY');
SECRET_KEY = os.environ.get('SECRET_KEY')
SQLALCHEMY_DATABASE_URI = os.environ.get("SQLALCHEMY_DATABASE_URI")
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_POOL_SIZE = 10
SQLALCHEMY_POOL_RECYCLE = 1800
JWT_SECRET = SECRET_KEY
TENCENT_REDIS_URL = os.getenv("TENCENT_REDIS_URL")
@staticmethod
def init_app(app):
......@@ -30,23 +31,14 @@ class Config:
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:N58uTiMT#tt*@sh-cdb-9yr5zhu0.sql.tencentcdb.com:61088/suishenwan'
class ProductionConfig(Config):
print("here is ProductionConfig")
DEBUG = False
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:Boosal2014!!@sh-cdb-kzmpyjqw.sql.tencentcdb.com:63037/suishenwan'
@classmethod
def init_app(cls, app):
Config.init_app(app)
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'dev': DevelopmentConfig,
'prod': ProductionConfig,
'default': DevelopmentConfig
}
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: base_config.py
"""
import os
SECRET_KEY = os.getenv('SECRET_KEY')
SQLALCHEMY_DATABASE_URI = os.getenv("SQLALCHEMY_DATABASE_URI")
MONGO_DATABASE_URI = os.getenv("MONGO_DATABASE_URI")
TENCENT_REDIS_URL = os.getenv("TENCENT_REDIS_URL")
ACTION_PWD = os.getenv("ACTION_PWD")
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: commen_config.py
"""
import os
ONENET_JS_CONFIG = {
"encoding_AES_key": os.getenv("ONENET_AES_KEY"),
"he_cloud_token": os.getenv("ONENET_TOKEN"),
"url": os.getenv("ONENET_JS_URL"),
"api_key": os.getenv("ONENET_JS_API_KEY"),
}
ONENET_OLD_CONFIG = {
"encoding_AES_key": os.getenv("ONENET_AES_KEY"),
"he_cloud_token": os.getenv("ONENET_TOKEN"),
"url": os.getenv("ONENET_JS_URL"),
"api_key": os.getenv("ONENET_OLD_API_KEY"),
"he_cloud_device_id": ["80092779", ]
}
ONENET_CONFIG = {
"encoding_AES_key": os.getenv("ONENET_AES_KEY"),
"he_cloud_token": os.getenv("ONENET_TOKEN"),
"url": os.getenv("ONENET_URL"),
"api_key": os.getenv("ONENET_API_KEY"),
"he_cloud_device_id": ["617584147", "617445180", "617445217", "617445252", "617445290", "617445330", "617444881",
"619509921", "619510055", "617444853", "620550194", "620561306", "620761823", "624476742",
"624476708", "628296149", "628649303", "628649346", "628649432", "628649498", "628919054",
"631690510", "631693136", "631693725", "631693885", "670152192", "672400636", "672400567",
"617584147", "617444636", "702673511", "702673648", "702673831", "702673998", "710158579",
"666663005", "666662358", "740525052", "740703987", "666662625", "617445054", "751837557",
"752301706", "752783519", "752813085", "756288603", "758408472",
# 测试服机柜
"617584805", "728554060", "740699918", "740704332", "740704407"]
}
INVOICE_CONFIG = {
"url": os.getenv("INVOICE_URL"),
"app_id": os.getenv("INVOICE_APP_ID"),
"app_key": os.getenv("INVOICE_APP_KEY"),
"cert_path": os.getenv("INVOICE_CERT"),
"xsf_nsrsbh": os.getenv("INVOICE_NSRSBH"), # 销售方纳税人识别号
"spbm": os.getenv("INVOICE_SPBM"), # 商品编码
"xsf_mc": os.getenv("INVOICE_XSFMC"),
"content": "*旅游娱乐服务*讲解服务",
"dizhi_dianhua": "苏州工业园区林泉街399号东南大学国家大学科技园(苏州)南工院2#210室 0512-62748449",
"yinhang_zhanghao": "招商银行苏州分行营业部512905030310801",
"kaipiaoren": "随身玩",
"shoukuanren": "随身玩",
"fuheren": "曹云香"
}
SMS_CONFIG = {
'app_id': os.getenv("SMS_APP_ID"),
'app_key': os.getenv("SMS_APP_KEY")
}
CMQ_CONFIG = {
'TENCENT_MQ_HOST': 'https://cmq-queue-sh.api.qcloud.com',
'TENCENT_MQ_SECRET_ID': os.getenv("TENCENT_MQ_SECRET_ID"),
'TENCENT_MQ_SECRET_KEY': os.getenv("TENCENT_MQ_SECRET_KEY"),
'TENCENT_OPEN_TOPIC': os.getenv("TENCENT_OPEN_TOPIC"),
}
EMAIL_CONFIG = {
'KEFU_EMAIL': os.getenv("EMAIL_ACCOUNT"),
'PASSWORD': os.getenv("EMAIL_PASSWORD"),
'SMTP_SERVER': os.getenv("EMAIL_SMTP_SERVER"),
'KEFU_NAME': '晓兔',
'SUBJECT': '电子发票已开具'
}
BATTERY = {
"sn_length": 18
}
ZHUOZHENGYUAN_AGENT_TOTAL_LIST = {
# 机柜号
"mac_no": ['1909033201020171', '1909033201020172', '1909033201020173', '1909033201020174',
'1909033201020175'],
# 改变景区策略
"spot_id_strategy": {"new_bussiness_id": 0, "new_spot_id": 51, "chance": 40},
# 修改策略
"agent_total_strategy": [
{"total": 2500, "agent_total": 2500, "chance": 0, "return_time_range": [0, 60]},
{"total": 3500, "agent_total": 2500, "chance": 50, "return_time_range": [40, 60]},
{"total": 6000, "agent_total": 2500, "chance": 50, "return_time_range": [40, 60]},
]
}
HANSHANSI_AGENT_TOTAL_LIST = {
# 机柜号
"mac_no": ['1708080101090097', '1909033201020170'],
# 改变景区策略
"spot_id_strategy": {"new_bussiness_id": 0, "new_spot_id": 49, "chance": 40},
# 修改策略
"agent_total_strategy": {"total": 2500, "agent_total": 2000, "chance": 100}
}
SHIZILIN_AGENT_TOTAL_LIST = {
# 机柜号
"mac_no": ['1708080101090080', '1909033201020165'],
# 改变景区策略
"spot_id_strategy": {"new_bussiness_id": 0, "new_spot_id": 50, "chance": 40},
}
ZHOUZHUANG_AGENT_TOTAL_LIST = {
# 机柜号
"mac_no": ['1904280001010116', '1904280001010117'],
# 修改策略
"agent_total_strategy": {"total": 3500, "agent_total": 2500, "chance": 100},
}
LOGIN_TYPE = {
'code_login': 1,
'token_login': 2,
'send_code': 3,
'password': 4
}
AGENT_STATUS = {
'1': '超级管理员',
'2': '代理商',
'3': '调试',
'4': '对账',
'5': '现场',
'6': '客服专员'
}
ACCOUNT_STATUS = {
'on_use': 1,
'delete': 2
}
TAKEOUT_RECORD_MAC_NO = ('1909033201020158', '1909033201020159',)
TAKEOUT_TOOL_TIPS_PHONE = [
'13505357700',
'13755790666',
'15279120691',
'13870952089'
]
# 挪出订单的景区对应编号
move_spot_ids = {16: 50, 42: 51, 48: 49}
PLATFORM = {
"xiaotu": 1,
"xiaodaoyou": 2
}
AVAILABLE_POWER = 50
RENT_PRODUCTION_NAME = "晓兔智能讲解器"
# Redis 前缀
# 订单出售锁
RENT_SALE_LOCK = "R_S_L_"
# 用户prepay_id
USER_RENT_PREPAY_ID = "U_R_P_"
# 微信session_key
WX_SESSION_KEY = "W_S_K_"
# 微信公众号access_token
WX_ACCESS_TOKEN = "W_A_T_"
# 微信公众号access_token过期时间
WX_ACCESS_TOKEN_EXPIRE = "W_A_T_E_"
COUPON_QRCODE_BIND = "C_Q_B_"
COUPON_QRCODE_GET = "C_Q_G_"
REDPACK_SESSION_KEY = "R_S_K_"
# bug短信通知频率锁
BUG_SMS_LOCK = "B_S_L_"
# 机柜讲解器上报数据丢失
MACHINE_POWER_MISS_KEY = "M_P_M_K_"
IMG_HEAD_URL = "https://dev-1255927177.cos.ap-shanghai.myqcloud.com"
IMG_DOMAIN_URL = "https://dev-1255927177.file.myqcloud.com"
# -*- coding":" utf-8 -*-
import os
is_prod = os.getenv("RUN_ENV", "test") == 'prod'
##生产环境
ctrip_config = {
"account": "fdfb2f240f06d377",
"secret": "d489064d61c8ff2df319353af3d92477",
"aes_secret": "7339b8630be49ce3",
"aes_iv": "1f288cac7be893d1",
"notice_url": "https://ttdentry.ctrip.com/ttd-connect-orderentryapi/supplier/order/notice.do",
# "notice_url": "https://ttdstp.ctrip.com/api/order/notice.do",
"encrypt_path": "/data/www/tour_guide/ctrip_aes_encrypt.php",
"decrypt_path": "/data/www/tour_guide/ctrip_aes_decrypt.php"
} if is_prod else {
"account": "aedba602d528cc8f",
"secret": "49c5071e88245130e01dd8ee74ecfca7",
"aes_secret": "3d15447926ba4580",
"aes_iv": "4afce4f66cf19df0",
"notice_url": "https://ttdstp.ctrip.com/api/order/notice.do",
"encrypt_path": "/data/www/tour_guide/ctrip_aes_encrypt.php",
"decrypt_path": "/data/www/tour_guide/ctrip_aes_decrypt.php"
}
CMQ_CONFIG = {
'TENCENT_MQ_HOST': 'https://cmq-queue-sh.api.qcloud.com',
'TENCENT_MQ_SECRET_ID': 'AKIDQp9OXawOAqWAjCAr9LRRL9vcEj8QMkke',
'TENCENT_MQ_SECRET_KEY': 'wnXDmFbqy1yoXhmldQ8kEf1GOPLBufMy',
'TENCENT_OPEN_TOPIC': 'ctrip' if is_prod else 'ctrip',
}
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: socket_config.py
"""
import os
NFC_CONFIG = {
"ip": os.getenv("NFC_SOCKET_IP"),
"port": os.getenv("NFC_SOCKET_PORT"),
"local_ip": "127.0.0.1"
}
!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: wechat_config.py
"""
import os
XT_CONFIG = {
"app_id": os.getenv("XT_MINI_PROGRAM_APPID"),
"app_secret": os.getenv("XT_MINI_PROGRAM_APPSECRET"),
}
XDY_MINI_PROGRAM_CONFIG = {
"app_id": os.getenv("XDY_MINI_PROGRAM_APPID"),
"app_secret": os.getenv("XDY_MINI_PROGRAM_APPSECRET"),
}
SXXJ_MINI_PROGRAM_CONFIG = {
"app_id": os.getenv("SXXJ_MINI_PROGRAM_APPID"),
"app_secret": os.getenv("SXXJ_MINI_PROGRAM_APPSECRET"),
}
SJY_CONFIG = {
"app_id": os.getenv("SJY_MINI_PROGRAM_APPID"),
"app_secret": os.getenv("SJY_MINI_PROGRAM_APPSECRET"),
}
SSW_PAY_CONFIG = {
"mch_id": os.getenv("SSW_MCHID"),
"pay_key": os.getenv("SSW_PAY_KEY"),
"cert_path": os.getenv("SSW_PAY_SSL_CERT_PATH"),
"key_path": os.getenv("SSW_PAY_SSL_KEY_PATH"),
'callback_url': '/rent/wx_pay_callback',
'refund_callback_url': '/rent/refund_callback',
'domain': 'https://guide.ssw-htzn.com/v2/tour/'
}
HTZN_OA_CONFIG = {
"app_id": os.getenv("HTZN_OA_APPID"),
"app_secret": os.getenv("HTZN_OA_APPSECRET"),
"Token": os.getenv("HTZN_OA_TOKEN"),
"EncodingAESKey": os.getenv("HTZN_OA_AESKEY"),
"oa_id": os.getenv("HTZN_OA_ID")
}
XJWL_OA_CONFIG = {
"app_id": os.getenv("XJWL_OA_APPID"),
"app_secret": os.getenv("XJWL_OA_APPSECRET"),
"Token": os.getenv("XJWL_OA_TOKEN"),
"EncodingAESKey": os.getenv("XJWL_OA_AESKEY"),
"oa_id": os.getenv("XJWL_OA_ID")
}
XJWL_PAY_CONFIG = {
"mch_id": os.getenv("XJWL_MCHID"),
"pay_key": os.getenv("XJWL_PAY_KEY"),
"cert_path": os.getenv("XJWL_PAY_SSL_CERT_PATH"),
"key_path": os.getenv("XJWL_PAY_SSL_KEY_PATH"),
'callback_url': '/rent/wx_pay_callback',
'refund_callback_url': '/rent/refund_callback',
'domain': 'https://guide.ssw-htzn.com/v2/tour/'
}
SXXJ_OA_CONFIG = {
"app_id": os.getenv("SXXJ_OA_APPID"),
"app_secret": os.getenv("SXXJ_OA_APPSECRET"),
"Token": os.getenv("SXXJ_OA_TOKEN"),
"EncodingAESKey": os.getenv("SXXJ_OA_AESKEY"),
"oa_id": os.getenv("SXXJ_OA_ID")
}
pay_config_list = ["", "ssw", "xjwl"]
pay_config_dict = {
"ssw": SSW_PAY_CONFIG,
"xjwl": XJWL_PAY_CONFIG,
}
platform_config_list = ["", "xiaotu", "xiaodaoyou", "huituzhineng", "xiaojian", "shoujiyou", "sxxj_mp", "sxxj_oa",
"business_web"]
platform_config_dict = {
"xiaotu": XT_CONFIG,
"xiaodaoyou": XDY_MINI_PROGRAM_CONFIG,
"huituzhineng": HTZN_OA_CONFIG,
"xiaojian": XJWL_OA_CONFIG,
"shoujiyou": SJY_CONFIG,
"sxxj_mp": SXXJ_MINI_PROGRAM_CONFIG,
"sxxj_oa": SXXJ_OA_CONFIG,
"business_web": "business_web",
}
platform_appid_config_list = [
"",
XT_CONFIG["app_id"], # 晓兔小程序 平台序号 ==>1
XDY_MINI_PROGRAM_CONFIG["app_id"], # 小导游小程序 平台序号==>2
HTZN_OA_CONFIG["app_id"], # 灰兔智能公众号 平台序号==>3
XJWL_OA_CONFIG["app_id"], # 晓见文旅公众号 平台序号==> 4
SJY_CONFIG["app_id"], # 手机游小程序 平台序号==> 5
SXXJ_MINI_PROGRAM_CONFIG["app_id"], # 随心晓见小程序 平台序号==> 6
SXXJ_OA_CONFIG["app_id"], # 随心晓见公众号 平台序号==> 7
"business_web" # 商户PC平台 平台序号==> 8
]
platform_appid_config_dict = {
"xiaotu": XT_CONFIG["app_id"],
"xiaodaoyou": XDY_MINI_PROGRAM_CONFIG["app_id"],
"huituzhineng": HTZN_OA_CONFIG["app_id"],
"xiaojian": XJWL_OA_CONFIG["app_id"],
"shoujiyou": SJY_CONFIG["app_id"],
"sxxj_mp": SXXJ_MINI_PROGRAM_CONFIG["app_id"],
"sxxj_oa": SXXJ_OA_CONFIG["app_id"],
"business_web": "business_web"
}
oa_id_config = {
"gh_598e6bc39f09": HTZN_OA_CONFIG,
"gh_c3bea8443020": XJWL_OA_CONFIG,
"gh_b2a7d67e5686": SXXJ_OA_CONFIG,
}
oa_id_mini_program_dict = {
"gh_598e6bc39f09": "",
"gh_c3bea8443020": {"app_id": SJY_CONFIG["app_id"],
"thumb_media_id": "U0xslX7PT0MqDF6nvztA98zjPFNOx67c2TC4xelRNa4"},
"gh_b2a7d67e5686": {"app_id": SXXJ_MINI_PROGRAM_CONFIG["app_id"],
"thumb_media_id": "HJKoAr6Q8QIr90A8eifiXdjAg93KUlhFh8vpfjJqzT0"},
}
# 拙政园,留园的机柜从随心晓见公众号跳转的时候跳转到晓兔小程序
SXXJ_OA_TO_XT_MINI_PROGRAM_MAC_LIST = ["2004013201020005", "2004013201020006", "2004013201020008", "1909033201020171",
"1909033201020172", "1909033201020173", "1909033201020174", "1909033201020175",
"2004013201020007"
]
#!usr/bin/env python
#-*- coding:utf-8 _*-
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: __init__.py.py
@function:
@modify:
"""
\ No newline at end of file
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: Base.py
@function:
@modify:
"""
from sqlalchemy import Column, func
from app import db
class BaseModel(db.Model):
__abstract__ = True
created_at = Column(db.TIMESTAMP, nullable=False, server_default=func.now())
updated_at = Column(db.TIMESTAMP, nullable=False, server_default=func.now(), onupdate=func.now())
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: Repairs.py
@function:
@modify:
"""
from sqlalchemy import Column, String, Integer
from models.Base import BaseModel
class PowerRepairModel(BaseModel):
__tablename__ = 'power_repair'
id = Column(Integer, primary_key=True)
power_sn = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='sn')
nfc_id = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='nfc_id')
repair_content = Column(String(40, 'utf8mb4_unicode_ci'), nullable=False, comment='维修内容')
repair_man = Column(String(20, 'utf8mb4_unicode_ci'), nullable=False, comment='维修人员')
index = Column(Integer, nullable=False, comment='维修次数')
status = Column(Integer, nullable=False, default=1, comment='1正常 -1删除')
# -*- coding: utf-8 -*-
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, func
db = SQLAlchemy(session_options={"autoflush": False})
class BaseModel(db.Model):
__abstract__ = True
created_at = Column(db.TIMESTAMP, nullable=False, server_default=func.now())
updated_at = Column(db.TIMESTAMP, nullable=False, server_default=func.now(), onupdate=func.now())
#! /usr/bin/.env python
from __future__ import print_function
import binascii
import logging
from smartcard.scard import *
import smartcard.util
logger = logging.getLogger(__name__)
class ScardService(object):
def __init__(self, reader=None):
if reader is None:
self.reader = "ACS ACR1281 1S Dual Reader PICC 0"
else:
self.reader = reader
def get_reader_state(self, state):
reader, eventstate, atr = state
logger.info(reader + " " + smartcard.util.toHexString(atr, smartcard.util.HEX))
if eventstate & SCARD_STATE_EMPTY:
message = 'Reader empty'
logger.info(message)
return 0, message
if eventstate & SCARD_STATE_PRESENT:
message = 'Card present in reader'
logger.info(message)
return 1, message
if eventstate & SCARD_STATE_ATRMATCH:
message = 'Card found'
logger.info(message)
return 2, message
if eventstate & SCARD_STATE_UNAWARE:
message = 'State unware'
logger.info(message)
return 3, message
if eventstate & SCARD_STATE_IGNORE:
message = 'Ignore reader'
logger.info(message)
return 4, message
if eventstate & SCARD_STATE_UNAVAILABLE:
message = 'Reader unavailable'
logger.info(message)
return 5, message
if eventstate & SCARD_STATE_EXCLUSIVE:
message = 'Card allocated for exclusive use by another application'
logger.info(message)
return 6, message
if eventstate & SCARD_STATE_INUSE:
message = 'Card in used by another application but can be shared'
logger.info(message)
return 7, message
if eventstate & SCARD_STATE_MUTE:
message = 'Card is mute'
logger.info(message)
return 8, message
if eventstate & SCARD_STATE_CHANGED:
message = 'State changed'
logger.info(message)
return 9, message
if eventstate & SCARD_STATE_UNKNOWN:
message = 'State unknowned'
logger.info(message)
return 10, message
return False
def establish_context(self):
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
logger.info('Context established!')
return hcontext
def release_context(self, hcontext):
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + SCardGetErrorMessage(hresult))
logger.info('Released context.')
def find_self_reader(self, hcontext):
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
if self.reader not in readers:
raise error('can not find reader ==> ' + self.reader)
return True
def handle_status_change(self, hcontext, readerstates):
try:
hresult, readerstates = SCardGetStatusChange(hcontext, INFINITE, readerstates)
rs_code, message = self.get_reader_state(readerstates[0])
if rs_code == 0:
logger.info("请放入设备")
return
elif rs_code == 1:
self.card_transmit(hcontext, self.reader)
else:
logger.info("未知错位,请重试")
exit(1)
except error as e:
print(e)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + SCardGetErrorMessage(hresult))
print('Released context.')
def my_scard_handler(self):
try:
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
print('Context established!')
try:
# 获取读卡器列表,筛选PICC类型是否存在
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
if self.reader not in readers:
raise error('can not find reader ==> ' + self.reader)
readerstates = [(self.reader, SCARD_STATE_UNAWARE)]
while True:
hresult, readerstates = SCardGetStatusChange(hcontext, INFINITE, readerstates)
rs_code, message = self.get_reader_state(readerstates[0])
if rs_code == 0:
logger.info("请放入设备")
continue
elif rs_code == 1:
self.card_transmit(hcontext, self.reader)
else:
logger.info("未知错位,请重试")
exit(1)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + \
SCardGetErrorMessage(hresult))
print('Released context.')
except error as e:
print(e)
def card_transmit(self, hcontext, zreader, command):
GET_RESPONSE = [0xA0, 0xC0, 0x00, 0x00]
print('Trying to select DF_TELECOM of card in', zreader)
try:
hresult, hcard, dwActiveProtocol = SCardConnect(
hcontext,
zreader,
SCARD_SHARE_SHARED, SCARD_PROTOCOL_T0 | SCARD_PROTOCOL_T1)
if hresult != SCARD_S_SUCCESS:
raise error(
'Unable to connect: ' +
SCardGetErrorMessage(hresult))
print('Connected with active protocol', dwActiveProtocol)
try:
hresult, response = SCardTransmit(
hcard, dwActiveProtocol, command)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to transmit: ' +
SCardGetErrorMessage(hresult))
return False
print('Selected DF_TELECOM: ' +
smartcard.util.toHexString(
response, smartcard.util.HEX))
return response
finally:
hresult = SCardDisconnect(hcard, SCARD_UNPOWER_CARD)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to disconnect: ' +
SCardGetErrorMessage(hresult))
print('Disconnected')
except error as message:
print(error, message)
if __name__ == '__main__':
scard = ScardService()
try:
hcontext = scard.establish_context()
result = scard.find_self_reader(hcontext)
if not result:
print('connect_result',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
# for i in range(5):
# command = [0xFF, 0xB0, 0x00, 20, 0x10]
# command = [0xFF, 0xD6, 0x00, 20, 0x10, 00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B,
# 0x0C, 0x0D, 0x0E, 0x0F]
# result = scard.card_transmit(hcontext, scard.reader, command)
# if result:
# data = result[:16]
# return_code = result[-2:]
# else:
# print("1002 reade_history_error")
except error as e:
logger.info(e)
finally:
hresult = scard.release_context(hcontext)
logger.info('Released context.')
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
@author:Aeolus
@file: __init__.py
@function:
@modify:
"""
from flask import Flask
from flask_cors import CORS
from flask_log_request_id import RequestID
from dotenv import load_dotenv
from models.base_model import db
from utils.my_redis_cache import redis_client
from utils.mylogger import set_logger
def create_app(config_name):
from config.env_path_config import env_path
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
set_logger()
app = Flask("repair_tool")
from config.app_config import config
app.config.from_object(config[config_name])
CORS(app)
RequestID(app)
return app
#!/usr/bin/.env python
import logging
import os
from threading import Lock
from flask import Flask, render_template, session, request
from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room, \
close_room, rooms, disconnect
# Set this variable to "threading", "eventlet" or "gevent" to test the
# different async modes, or leave it set to None for the application to choose
# the best option based on installed packages.
from myapps.nfc.repair_tool.service.scard_service import ScardService
from myapps.nfc.repair_tool.socket_server import create_app
from smartcard.scard import error as scard_error, SCARD_S_SUCCESS, SCardGetErrorMessage
logger = logging.getLogger(__name__)
async_mode = 'threading'
app = create_app(os.getenv('RUN_ENV', 'prod'))
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True)
thread = None
thread_lock = Lock()
@app.route("/", methods=["GET"])
def index():
socketio.emit("my_response", {'data': 'index'})
def background_thread():
"""Example of how to send server generated events to clients."""
count = 0
while True:
socketio.sleep(10)
count += 1
socketio.emit('my_response',
{'data': 'Server generated event', 'count': count},
namespace='/test')
class MyNamespace(Namespace):
def __init__(self, namespace=None):
super(Namespace, self).__init__(namespace)
self.scard = ScardService()
def on_reader_connect(self, message):
try:
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if result:
emit('reader_connect', {'data': True, 'error_code': 0, 'error_message': 'success'})
else:
emit('reader_connect',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
except scard_error as e:
logger.info(e)
finally:
hresult = self.scard.release_context(hcontext)
logger.info('Released context.')
def on_read_history(self, message):
try:
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('read_history',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
history_data = []
for i in range(5):
command = [0xFF, 0xB0, 0x00, 10 + i, 0x10]
result = self.scard.card_transmit(hcontext, self.scard.reader, command)
if result:
data = result[:16]
return_code = result[-2:]
if data[0] == 0:
history_data.append({})
else:
data = [str(i) for i in data]
repair_date = ''.join(data[:8])
repair_content = int(''.join(data[8:10]))
repair_man = int(''.join(data[10:12]))
history_data.append(
{'repair_date': repair_date, 'repair_content': repair_content, 'repair_man': repair_man})
else:
emit('read_history',
{'data': None, 'error_code': 1002, 'error_message': '读取历史数据失败连接错误, reader connect error'})
emit('read_history',
{'data': history_data, 'error_code': 0, 'error_message': 'success'})
except scard_error as e:
logger.info(e)
finally:
hresult = self.scard.release_context(hcontext)
logger.info('Released context.')
def on_write_repair(self, message):
try:
repair_date = message["repair_date"]
repair_content = str(message["repair_content"]).zfill(2)
repair_man = str(message["repair_man"]).zfill(2)
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('write_repair',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
history_data = []
for i in range(5):
read_command = [0xFF, 0xB0, 0x00, 10 + i, 0x10]
result = self.scard.card_transmit(hcontext, self.scard.reader, read_command)
if result:
data = result[:16]
return_code = result[-2:]
if data[0] == 0:
write_command = [0xFF, 0xD6, 0x00, 10 + i, 0x10]
write_command += [int(x) for x in repair_date]
write_command += [int(x) for x in repair_content]
write_command += [int(x) for x in repair_man]
write_command += [0, 0, 0, 0]
result = self.scard.card_transmit(hcontext, self.scard.reader, write_command)
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
pass
else:
emit('write_repair',
{'data': None, 'error_code': 1003,
'error_message': '写入维修数据失败,请重试, write repair data error'})
break
emit('write_repair',
{'data': history_data, 'error_code': 0, 'error_message': 'success'})
except scard_error as e:
logger.info(e)
finally:
hresult = self.scard.release_context(hcontext)
logger.info('Released context.')
def on_read_sn(self, message):
try:
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('read_sn',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
read_command = [0xFF, 0xB0, 0x00, 17, 0x10]
result = self.scard.card_transmit(hcontext, self.scard.reader, read_command)
if result:
data = result[:16]
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
sn = [hex(i).replace('0x', '').upper() for i in data]
sn = ''.join(sn)
sn = '07' + sn
else:
emit('write_repair',
{'data': None, 'error_code': 1003,
'error_message': '写入维修数据失败,请重试, write repair data error'})
emit('write_repair',
{'data': sn, 'error_code': 0, 'error_message': 'success'})
except scard_error as e:
logger.info(e)
finally:
hresult = self.scard.release_context(hcontext)
logger.info('Released context.')
def on_write_sn(self, message):
try:
sn = message["sn"]
last_four = hex(int(sn[-4:])).replace('0x', '').zfill(4)
sn = sn[:-4] + last_four
# TODO 校验sn是否重复
#
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('write_sn',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
sn = sn[2:]
write_command = [0xFF, 0xD6, 0x00, 17, 0x10]
write_command += [int(i, 16) for i in sn]
result = self.scard.card_transmit(hcontext, self.scard.reader, write_command)
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
pass
else:
emit('write_sn',
{'data': None, 'error_code': 1004,
'error_message': '写入sn数据失败,请重试, write repair data error'})
emit('write_sn',
{'data': None, 'error_code': 0, 'error_message': 'success'})
except scard_error as e:
logger.info(e)
finally:
hresult = self.scard.release_context(hcontext)
logger.info('Released context.')
# 广播事件,接收消息后广播
def on_my_broadcast_event(self, message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': message['data'], 'count': session['receive_count']},
broadcast=True)
def on_ping(self):
emit('my_pong')
def on_connect(self, auth):
if request.remote_addr == '127.0.0.1':
pass
else:
print(auth)
if auth["token"] != '1234':
disconnect()
else:
print("connect success")
emit('my_response', {'data': 'Connected', 'count': 0})
def on_disconnect(self):
print('Client disconnected', request.sid)
name_space = MyNamespace()
socketio.on_namespace(name_space)
if __name__ == '__main__':
socketio.run(app)
#! /usr/bin/env python3
"""
Sample for python PCSC wrapper module: send a Control Code to a card or
reader
__author__ = "Ludovic Rousseau"
Copyright 2007-2010 Ludovic Rousseau
Author: Ludovic Rousseau, mailto:ludovic.rousseau@free.fr
This file is part of pyscard.
pyscard is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
pyscard is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with pyscard; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
from smartcard.scard import *
from smartcard.util import toBytes
try:
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
print('Context established!')
try:
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
print('PCSC Readers:', readers)
if len(readers) < 1:
raise error('No smart card readers')
for zreader in readers:
print('Trying to Control reader:', zreader)
try:
hresult, hcard, dwActiveProtocol = SCardConnect(
hcontext, zreader, SCARD_SHARE_DIRECT, SCARD_PROTOCOL_T0)
if hresult != SCARD_S_SUCCESS:
raise error(
'Unable to connect: ' + SCardGetErrorMessage(hresult))
print('Connected with active protocol', dwActiveProtocol)
try:
if 'winscard' == resourceManager:
# IOCTL_SMARTCARD_GET_ATTRIBUTE = SCARD_CTL_CODE(2)
hresult, response = SCardControl(
hcard,
SCARD_CTL_CODE(2),
toBytes("%.8lx" % SCARD_ATTR_VENDOR_NAME))
if hresult != SCARD_S_SUCCESS:
raise error(
'SCardControl failed: ' +
SCardGetErrorMessage(hresult))
r = ""
for i in range(len(response)):
r += "%c" % response[i]
print('SCARD_ATTR_VENDOR_NAME:', r)
elif 'pcsclite' == resourceManager:
# get firmware on Gemplus readers
hresult, response = SCardControl(
hcard,
SCARD_CTL_CODE(1),
[0x02])
if hresult != SCARD_S_SUCCESS:
raise error(
'SCardControl failed: ' +
SCardGetErrorMessage(hresult))
r = ""
for i in range(len(response)):
r += "%c" % response[i]
print('Control:', r)
finally:
hresult = SCardDisconnect(hcard, SCARD_UNPOWER_CARD)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to disconnect: ' +
SCardGetErrorMessage(hresult))
print('Disconnected')
except error as message:
print(error, message)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' +
SCardGetErrorMessage(hresult))
print('Released context.')
except error as e:
print(e)
import sys
if 'win32' == sys.platform:
print('press Enter to continue')
sys.stdin.read(1)
\ No newline at end of file
#! /usr/bin/.env python
from __future__ import print_function
from smartcard.scard import *
import smartcard.util
srTreeATR = [0x3B, 0x77, 0x94, 0x00, 0x00, 0x82, 0x30, 0x00, 0x13, 0x6C, 0x9F, 0x22]
srTreeMask = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]
class ScardService(object):
def __init__(self):
self.reader = "ACS ACR1281 1S Dual Reader PICC 0"
def printstate(self, state):
reader, eventstate, atr = state
print(reader + " " + smartcard.util.toHexString(atr, smartcard.util.HEX))
if eventstate & SCARD_STATE_ATRMATCH:
print('\tCard found')
if eventstate & SCARD_STATE_UNAWARE:
print('\tState unware')
if eventstate & SCARD_STATE_IGNORE:
print('\tIgnore reader')
if eventstate & SCARD_STATE_UNAVAILABLE:
print('\tReader unavailable')
if eventstate & SCARD_STATE_EMPTY:
print('\tReader empty')
if eventstate & SCARD_STATE_PRESENT:
print('\tCard present in reader')
return True
if eventstate & SCARD_STATE_EXCLUSIVE:
print('\tCard allocated for exclusive use by another application')
if eventstate & SCARD_STATE_INUSE:
print('\tCard in used by another application but can be shared')
if eventstate & SCARD_STATE_MUTE:
print('\tCard is mute')
if eventstate & SCARD_STATE_CHANGED:
print('\tState changed')
if eventstate & SCARD_STATE_UNKNOWN:
print('\tState unknowned')
return False
def my_scard_handler(self):
try:
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
print('Context established!')
try:
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
if self.reader not in readers:
raise error('can not find reader ==> ' + self.reader)
readerstates = [(self.reader, SCARD_STATE_UNAWARE)]
# for i in range(len(readers)):
# readerstates += [(readers[i], SCARD_STATE_UNAWARE)]
print('----- Current reader and card states are: -------')
hresult, newstates = SCardGetStatusChange(hcontext, 0, readerstates)
for i in newstates:
self.printstate(i)
print('----- Please insert or remove a card ------------')
hresult, newstates = SCardGetStatusChange(
hcontext,
INFINITE,
newstates)
print('----- New reader and card states are: -----------')
for i in newstates:
self.printstate(i)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + \
SCardGetErrorMessage(hresult))
print('Released context.')
except error as e:
print(e)
if __name__ == '__main__':
service = ScardService()
service.my_scard_handler()
#! /usr/bin/.env python
from __future__ import print_function
from smartcard.scard import *
import smartcard.util
class ScardService(object):
def __init__(self):
self.reader = "ACS ACR1281 1S Dual Reader PICC 0"
def printstate(self, state):
reader, eventstate, atr = state
print(reader + " " + smartcard.util.toHexString(atr, smartcard.util.HEX))
if eventstate & SCARD_STATE_ATRMATCH:
print('\tCard found')
if eventstate & SCARD_STATE_UNAWARE:
print('\tState unware')
if eventstate & SCARD_STATE_IGNORE:
print('\tIgnore reader')
if eventstate & SCARD_STATE_UNAVAILABLE:
print('\tReader unavailable')
if eventstate & SCARD_STATE_EMPTY:
print('\tReader empty')
return -1
if eventstate & SCARD_STATE_PRESENT:
print('\tCard present in reader')
return 1
if eventstate & SCARD_STATE_EXCLUSIVE:
print('\tCard allocated for exclusive use by another application')
if eventstate & SCARD_STATE_INUSE:
print('\tCard in used by another application but can be shared')
if eventstate & SCARD_STATE_MUTE:
print('\tCard is mute')
if eventstate & SCARD_STATE_CHANGED:
print('\tState changed')
if eventstate & SCARD_STATE_UNKNOWN:
print('\tState unknowned')
return False
def my_scard_handler(self):
try:
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
print('Context established!')
try:
# 获取读卡器列表,筛选PICC类型是否存在
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
if self.reader not in readers:
raise error('can not find reader ==> ' + self.reader)
readerstates = [(self.reader, SCARD_STATE_UNAWARE)]
# for i in range(len(readers)):
# readerstates += [(readers[i], SCARD_STATE_UNAWARE)]
# print('----- Current reader and card states are: -------')
# hresult, newstates = SCardGetStatusChange(hcontext, INFINITE, readerstates)
# for i in newstates:
# self.printstate(i)
#
# print('----- Please insert or remove a card ------------')
# hresult, newstates = SCardGetStatusChange(
# hcontext,
# INFINITE,
# newstates)
#
# print('----- New reader and card states are: -----------')
# for i in newstates:
# self.printstate(i)
print('----- Please insert or remove a card ------------')
newstates = readerstates
while True:
hresult, newstates = SCardGetStatusChange(hcontext, INFINITE, newstates)
result = self.printstate(newstates[0])
if result == -1:
print("请放入设备")
continue
elif result == 1:
card_transmit(hcontext, self.reader)
else:
print("未知错位,请重试")
exit(1)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + \
SCardGetErrorMessage(hresult))
print('Released context.')
except error as e:
print(e)
def card_transmit(hcontext, zreader):
SELECT = [0xFF, 0xB0, 0x00]
DF_TELECOM = [0x10, 0x10]
GET_RESPONSE = [0xA0, 0xC0, 0x00, 0x00]
print('Trying to select DF_TELECOM of card in', zreader)
try:
hresult, hcard, dwActiveProtocol = SCardConnect(
hcontext,
zreader,
SCARD_SHARE_SHARED, SCARD_PROTOCOL_T0 | SCARD_PROTOCOL_T1)
if hresult != SCARD_S_SUCCESS:
raise error(
'Unable to connect: ' +
SCardGetErrorMessage(hresult))
print('Connected with active protocol', dwActiveProtocol)
try:
hresult, response = SCardTransmit(
hcard, dwActiveProtocol, SELECT + DF_TELECOM)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to transmit: ' +
SCardGetErrorMessage(hresult))
print('Selected DF_TELECOM: ' +
smartcard.util.toHexString(
response, smartcard.util.HEX))
# hresult, response = SCardTransmit(
# hcard,
# dwActiveProtocol,
# GET_RESPONSE + [response[1]])
# if hresult != SCARD_S_SUCCESS:
# raise error(
# 'Failed to transmit: ' +
# SCardGetErrorMessage(hresult))
# print('GET_RESPONSE after SELECT DF_TELECOM: ' +
# smartcard.util.toHexString(
# response, smartcard.util.HEX))
finally:
hresult = SCardDisconnect(hcard, SCARD_UNPOWER_CARD)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to disconnect: ' +
SCardGetErrorMessage(hresult))
print('Disconnected')
except error as message:
print(error, message)
if __name__ == '__main__':
service = ScardService()
service.my_scard_handler()
#! /usr/bin/env python3
"""
Sample for python PCSC wrapper module: Detect card insertion/removal
__author__ = "http://www.gemalto.com"
Copyright 2001-2012 gemalto
Author: Jean-Daniel Aussel, mailto:jean-daniel.aussel@gemalto.com
Copyright 2010 Ludovic Rousseau
Author: Ludovic Rousseau, mailto:ludovic.rousseau@free.fr
This file is part of pyscard.
pyscard is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
pyscard is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with pyscard; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
from smartcard.scard import *
import smartcard.util
srTreeATR = \
[0x3B, 0x77, 0x94, 0x00, 0x00, 0x82, 0x30, 0x00, 0x13, 0x6C, 0x9F, 0x22]
srTreeMask = \
[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]
def printstate(state):
reader, eventstate, atr = state
print(reader + " " + smartcard.util.toHexString(atr, smartcard.util.HEX))
if eventstate & SCARD_STATE_ATRMATCH:
print('\tCard found')
if eventstate & SCARD_STATE_UNAWARE:
print('\tState unware')
if eventstate & SCARD_STATE_IGNORE:
print('\tIgnore reader')
if eventstate & SCARD_STATE_UNAVAILABLE:
print('\tReader unavailable')
if eventstate & SCARD_STATE_EMPTY:
print('\tReader empty')
if eventstate & SCARD_STATE_PRESENT:
print('\tCard present in reader')
if eventstate & SCARD_STATE_EXCLUSIVE:
print('\tCard allocated for exclusive use by another application')
if eventstate & SCARD_STATE_INUSE:
print('\tCard in used by another application but can be shared')
if eventstate & SCARD_STATE_MUTE:
print('\tCard is mute')
if eventstate & SCARD_STATE_CHANGED:
print('\tState changed')
if eventstate & SCARD_STATE_UNKNOWN:
print('\tState unknowned')
try:
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' +
SCardGetErrorMessage(hresult))
print('Context established!')
try:
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' +
SCardGetErrorMessage(hresult))
print('PCSC Readers:', readers)
readerstates = []
for i in range(len(readers)):
readerstates += [(readers[i], SCARD_STATE_UNAWARE)]
print('----- Current reader and card states are: -------')
hresult, newstates = SCardGetStatusChange(hcontext, 0, readerstates)
for i in newstates:
printstate(i)
print('----- Please insert or remove a card ------------')
hresult, newstates = SCardGetStatusChange(
hcontext,
INFINITE,
newstates)
print('----- New reader and card states are: -----------')
for i in newstates:
printstate(i)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' +
SCardGetErrorMessage(hresult))
print('Released context.')
import sys
if 'win32' == sys.platform:
print('press Enter to continue')
sys.stdin.read(1)
except error as e:
print(e)
\ No newline at end of file
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/06/17
@file: __init__.py.py
@function:
@modify:
"""
import os
from pathlib import Path
from dotenv import load_dotenv
from flask import Flask
from flask_cors import CORS
from flask_log_request_id import RequestID
from models.base_model import db
from utils.my_redis_cache import redis_client
from utils.mylogger import set_logger
def create_app(config_name):
from config.env_path_config import env_path
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
set_logger()
app = Flask('nfc_socket')
from config.app_config import config
app.config.from_object(config[config_name])
CORS(app, supports_credentials=True) # HTTP/HTTPS 跨域
db.init_app(app)
# redis_client.init_app(app)
RequestID(app)
from utils.middlewares import log_enter_interface, jwt_authentication, log_out_interface, close_db_session, \
all_options_pass
app.before_request(log_enter_interface)
app.before_request(all_options_pass)
app.before_request(jwt_authentication)
app.after_request(log_out_interface)
app.after_request(close_db_session)
# todo register blueprint
from myapps.nfc.socket_server.api.account_portal import account
app.register_blueprint(account, url_prefix="/nfc/account")
return app
if __name__ == '__main__':
print(os.path.abspath('../../../config/.env'))
# -*- coding: utf-8 -*-
import datetime
import logging
import time
from flask import Blueprint, jsonify, request, g
from config.commen_config import LOGIN_TYPE
from models.base_model import db
from models.user_models import NfcAccountModel, NfcAccountLogRecord
from service.sms_service import SMSService
from utils.error_code.auth_error import PHONE_NOT_NULL_ERROR, PHONE_NOT_VALID_ERROR, TOKEN_EXPIRE_ERROR, \
TOKEN_NOT_VALID_ERROR, VERIFICATION_CODE_INVALID_ERROR, VERIFICATION_CODE_ERROR
from utils.jwt_util import verify_jwt, generate_jwt
from utils.my_response import BaseResponse
logger = logging.getLogger(__name__)
account = Blueprint('account', __name__)
@account.route('/sendCode', methods=['GET', 'POST'])
def send_code():
json_data = request.get_json()
cur_ip = request.remote_addr
phone = json_data['phone'] if 'phone' in json_data else None
if not phone:
return BaseResponse(**PHONE_NOT_NULL_ERROR)
logger.info(phone)
# 判断该手机号是否再数据库中,不在返回无权限登录
account = NfcAccountModel.query.filter_by(phone=phone, status=1).first()
if not account:
return BaseResponse(**PHONE_NOT_VALID_ERROR)
sms = SMSService()
result = sms.phoneSendCode(phone, 520391, '灰兔智能')
logger.info(result)
agent_log = NfcAccountLogRecord()
agent_log.phone = phone
agent_log.ip = cur_ip
agent_log.last_login = datetime.datetime.now()
agent_log.login_type = LOGIN_TYPE['send_code']
agent_log.created_at = datetime.datetime.now()
agent_log.updated_at = datetime.datetime.now()
db.session.add(agent_log)
db.session.commit()
return BaseResponse()
@account.route('/login', methods=['GET', 'POST'])
def login():
token = request.headers.get('token')
cur_ip = request.remote_addr
json_data = request.get_json()
data = {}
phone = json_data['phone'] if 'phone' in json_data else None
code = json_data['code'] if 'code' in json_data else None
login_type = json_data['type'] if 'type' in json_data else 1 # 1.验证码登录,2.密码登录
if token:
# token登录
payload = verify_jwt(token)
# "判断token的校验结果"
if not payload:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
else:
# "获取载荷中的信息赋值给g对象"
user_id = payload.get('user_id')
if not user_id:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
user_info = NfcAccountModel.query.filter_by(id=user_id, status=1).first()
if not user_info:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
else:
if login_type == 1:
# 验证码登录
# 判断验证码是否正确
sms = SMSService()
res = sms.verificate(phone, code)
if res == -1:
return BaseResponse(**VERIFICATION_CODE_INVALID_ERROR)
elif res == -2:
return BaseResponse(**VERIFICATION_CODE_ERROR)
user_info = NfcAccountModel.query.filter_by(phone=phone, status=1).first()
if not user_info:
return BaseResponse(**PHONE_NOT_VALID_ERROR)
new_token = generate_jwt(payload={"user_id": user_info.id}, expiry=time.time() + 24 * 60 * 60)
user_info.access_token = new_token
db.session.add(user_info)
agent_log = NfcAccountLogRecord()
agent_log.phone = user_info.phone
agent_log.ip = cur_ip
agent_log.last_login = user_info.last_login
agent_log.login_type = LOGIN_TYPE['token_login']
agent_log.created_at = datetime.datetime.now()
agent_log.updated_at = datetime.datetime.now()
db.session.add(agent_log)
db.session.commit()
data['token'] = new_token
data['user_name'] = user_info.user_name
data['phone'] = user_info.phone
return BaseResponse(data=data)
@account.route('/test', methods=['GET', 'POST'])
def my_test():
return BaseResponse()
#!/usr/bin/.env python
import os
from threading import Lock
from flask import Flask, render_template, session, request
from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room, \
close_room, rooms, disconnect
# Set this variable to "threading", "eventlet" or "gevent" to test the
# different async modes, or leave it set to None for the application to choose
# the best option based on installed packages.
from myapps.nfc.socket_server import create_app
async_mode = 'threading'
app = create_app(os.getenv('RUN_ENV', 'prod'))
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True)
thread = None
thread_lock = Lock()
@app.route("/", methods=["GET"])
def index():
socketio.emit("my_response", {'data': 'index'})
def background_thread():
"""Example of how to send server generated events to clients."""
count = 0
while True:
socketio.sleep(10)
count += 1
socketio.emit('my_response',
{'data': 'Server generated event', 'count': count},
namespace='/test')
class MyNamespace(Namespace):
def on_my_event(self, message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': message['data'], 'count': session['receive_count']})
def on_new_rent(self, message):
print("...")
def on_my_message(self, message):
print(message)
print(request.remote_addr)
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': message})
def on_my_broadcast_event(self, message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': message['data'], 'count': session['receive_count']},
broadcast=True)
def on_join(self, message):
join_room(message['room'])
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': 'In rooms: ' + ', '.join(rooms()),
'count': session['receive_count']})
def on_leave(self, message):
leave_room(message['room'])
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': 'In rooms: ' + ', '.join(rooms()),
'count': session['receive_count']})
def on_close_room(self, message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response', {'data': 'Room ' + message['room'] + ' is closing.',
'count': session['receive_count']},
room=message['room'])
close_room(message['room'])
def on_my_room_event(self, message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': message['data'], 'count': session['receive_count']},
room=message['room'])
def on_disconnect_request(self):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': 'Disconnected!', 'count': session['receive_count']})
disconnect()
def on_ping(self):
emit('my_pong')
def on_connect(self, auth):
if request.remote_addr == '127.0.0.1':
pass
else:
if auth["token"] != '1234':
disconnect()
emit('my_response', {'data': 'Connected', 'count': 0})
def on_disconnect(self):
print('Client disconnected', request.sid)
socketio.on_namespace(MyNamespace('/'))
if __name__ == '__main__':
socketio.run(app)
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
async_mode = 'eventlet'
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True)
@app.route('/')
def index():
return render_template('index.html')
# @socketio.event
# def my_event(message):
# print("==================")
# emit('my response', {'data': 'got it!'})
@socketio.on('mymessage', namespace='/test')
def handle_message(data):
print('30 received message: ')
print(data)
result = emit('my_response', {'code': '200', 'msg': 'start to process...'}, namespace='/test')
print(result)
@socketio.on('connect', namespace='/test')
def handle_connect(data):
print('30 received connect: ')
print(data)
result = emit('my_response', {'code': '200', 'msg': 'start to process...'}, namespace='/test')
print(result)
if __name__ == '__main__':
socketio.run(app)
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/06/14
@file: socketio_client.py
@function:
@modify:
"""
import socketio
sio = socketio.Client(logger=True, engineio_logger=True)
# sio = socketio.Client()
@sio.event()
def connect():
print('connection established')
# sio.emit('my_message', {'response': 'my response'}, namespace='/test')
@sio.on('my_response')
def on_message(data):
print('I received a my_response!')
print(data)
@sio.on('reader_connect')
def on_message(data):
print('I received a reader_connect!')
print(data)
@sio.on('read_history')
def on_message(data):
print('I received a read_history!')
print(data)
@sio.on('write_repair')
def on_message(data):
print('I received a write_repair!')
print(data)
@sio.on('write_sn')
def on_message(data):
print('I received a write_sn!')
print(data)
@sio.on('read_sn')
def on_message(data):
print('I received a read_sn!')
print(data)
@sio.event
def disconnect():
print('disconnected from server')
sio.connect('http://127.0.0.1:5000', auth={"token": "1234"})
# sio.connect('http://127.0.0.1:5000')
# sio.wait()
# # sio.emit('my_message', {'response': 'my response'})
# sio.emit('write_repair', {'repair_date': '20210826', 'repair_content': 1, 'repair_man': 6})
# sio.emit('write_repair', {'repair_date': '20210827', 'repair_content': 2, 'repair_man': 7})
# sio.emit('write_repair', {'repair_date': '20210828', 'repair_content': 3, 'repair_man': 8})
# sio.emit('write_repair', {'repair_date': '20210829', 'repair_content': 4, 'repair_man': 9})
# sio.emit('write_repair', {'repair_date': '20210830', 'repair_content': 5, 'repair_man': 10})
# sio.emit('read_history', {'repair_date': '20210826', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('write_sn', {'sn': '07E2E3070A01411104', 'repair_content': '01', 'repair_man': '01'})
sio.emit('read_sn', {'sn': '07E2E3070A01411104', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('reader_connect', {'sn': '07E2E3070A01411104', 'repair_content': '01', 'repair_man': '01'})
sio.wait()
# sio.
#!usr/bin/env python
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/03/25
@file: app.py.py
@function:
@modify:
@author:Aeolus
"""
import os
import logging
from app import create_app
from myapps.nfc.repair_tool.socket_server import create_app
logger = logging.getLogger(__name__)
app = create_app('production')
# app = create_app(os.getenv('RUN_ENV', 'development'))
app = create_app(os.getenv('RUN_ENV', 'prod'))
logger.info("run server")
app.run('127.0.0.1', 8888, debug=True)
if __name__ == '__main__':
app.run('127.0.0.1', 8888, debug=True)
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
\ No newline at end of file
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
# "添加请求钩子"
from utils.middlewares import jwt_authentication
#!usr/bin/env python
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
......@@ -8,7 +8,6 @@ author:Aeolus
@function:
@modify:
"""
import jwt
from flask import current_app
......@@ -25,10 +24,10 @@ def generate_jwt(payload, expiry, secret=None):
_payload.update(payload)
if not secret:
secret = current_app.config['JWT_SECRET']
secret = current_app.config['SECRET_KEY']
token = jwt.encode(_payload, secret, algorithm='HS256')
return token.decode()
return token
def verify_jwt(token, secret=None):
......@@ -39,11 +38,22 @@ def verify_jwt(token, secret=None):
:return: dict: payload
"""
if not secret:
secret = current_app.config['JWT_SECRET']
secret = current_app.config['SECRET_KEY']
try:
payload = jwt.decode(token, secret, algorithm=['HS256'])
payload = jwt.decode(token, secret, algorithms=['HS256'])
except jwt.PyJWTError:
payload = None
return payload
if __name__ == '__main__':
import time
token = generate_jwt({"user_id": 163}, time.time() + 600, 'qwer1234')
# for i in range(10):
# result = verify_jwt(token, 'secret')
# print(result)
# print(time.time())
# time.sleep(1)
#!usr/bin/env python
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
......@@ -8,14 +8,45 @@ author:Aeolus
@function:
@modify:
"""
import logging
from flask import g, request, url_for, current_app, make_response, jsonify
from flask import g, request
from config.wechat_config import platform_config_list
from service.agent_service import AgentService
from utils.error_code.auth_error import TOKEN_NOT_VALID_ERROR, TOKEN_EXPIRE_ERROR
from models.user_models import CustomerModel, AgentAccount
from utils.my_response import BaseResponse
from utils.jwt_util import verify_jwt
logger = logging.getLogger(__name__)
def log_enter_interface():
"""
日志打印进入接口
:return:
"""
logger.info("######################### 进入 {} 接口 ################################ ".format(request.path))
def log_out_interface(environ):
"""
日志打印退出接口
:return:
"""
logger.info("######################### 退出 {} 接口 ################################\n".format(request.path))
return environ
def close_db_session(environ):
from models.base_model import db
db.session.close()
return environ
from . import jwt_util
"""用户认证机制==>每次请求前获取并校验token"""
"@app.before_request 不使@调用装饰器 在 init文件直接装饰"
"@myapps.before_request 不使@调用装饰器 在 init文件直接装饰"
def jwt_authentication():
......@@ -25,14 +56,108 @@ def jwt_authentication():
3.使用jwt模块进行校验
4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存
"""
auth = request.headers.get('Authorization')
if auth and auth.startswith('Bearer '):
"提取token 0-6 被Bearer和空格占用 取下标7以后的所有字符"
token = auth[7:]
"校验token"
payload = jwt_util.verify_jwt(token)
"判断token的校验结果"
if current_app.name == "nfc_socket":
NO_AUTH_CHECK_URL = [url_for('account.login'), ]
elif current_app.name == "business_mp":
NO_AUTH_CHECK_URL = [url_for('account.test'), url_for('account.login'), url_for('account.send_code'),
url_for('tool.tool_reset')]
if request.path not in NO_AUTH_CHECK_URL:
token = request.headers.get('token')
if not token:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
result = AgentService.check_agent_token(token)
if result == 1:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
elif result == 2:
return BaseResponse(**TOKEN_EXPIRE_ERROR)
else:
g.user = result
logger.info("本次登录用户名称:{0},手机号:{1},类型:{2}".format(result.user_name, result.phone, result.level_desc))
return
elif current_app.name == "business_web":
NO_AUTH_CHECK_URL = [url_for('account.user_login'), url_for('account.send_code')]
if request.path not in NO_AUTH_CHECK_URL:
token = request.headers.get('Authorization')
# "校验token"
payload = verify_jwt(token)
# "判断token的校验结果"
if payload:
"获取载荷中的信息赋值给g对象"
g.user_id = payload.get('user_id')
g.refresh = payload.get('refresh')
# "获取载荷中的信息赋值给g对象"
user_id = payload.get('user_id')
if not user_id:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
result = AgentAccount.query.filter_by(id=user_id, status=1).first()
if not result:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
g.user = result
else:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
return
elif current_app.name == "onenet":
NO_AUTH_CHECK_URL = [url_for('onenet.message_handler'), ]
elif current_app.name == "rent_core":
NO_AUTH_CHECK_URL = [url_for('wx_auth.test'),
url_for('wx_auth.mini_login'),
url_for('rent.wx_pay_callback'),
url_for('rent.test'),
url_for('rent.run_scan'),
url_for('message.message_put'),
url_for('ctrip.run_ctrip'),
url_for('coupon.openid_test'),
url_for('coupon.openid_callback'),
url_for('alltour.wx_oa_entrance'),
url_for('xiaojianwenlv.wx_oa_entrance'),
url_for('xiaojianwenlv.get_wx_media_list'),
url_for('suixinxiaojian.wx_oa_entrance'),
url_for('suixinxiaojian.get_wx_media_list'),
url_for('redpack.send_redpack'),
url_for('spot.get_spot_by_lat_lng'),
url_for('machine.get_machine_by_lat_lng'),
url_for('rent.get_machine_price'),
url_for('other.get_spot_phone'),
url_for('other.login'),
url_for('other.send_code'),
url_for('onenet.message_handler'), ]
else:
NO_AUTH_CHECK_URL = []
if request.path not in NO_AUTH_CHECK_URL:
token = request.headers.get('Authorization')
# "校验token"
payload = verify_jwt(token)
# "判断token的校验结果"
if payload:
# "获取载荷中的信息赋值给g对象"
user_id = payload.get('user_id')
if not user_id:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
g.user = CustomerModel.query.filter_by(id=user_id).first()
else:
return BaseResponse(**TOKEN_NOT_VALID_ERROR)
def get_platform():
"""
:return:
"""
platform = request.headers.get('platform', "xiaotu")
g.platform = platform_config_list.index(platform)
def all_options_pass():
"""
:return:
"""
if request.method == "OPTIONS":
headers = {'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Headers':
'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , platform',
}
return make_response((jsonify({'error_code': 0}), 200, headers))
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: my_redis_cache.py
"""
from flask_redis import FlaskRedis
redis_client = FlaskRedis(config_prefix='TENCENT_REDIS')
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: my_response.py
@function:
@modify:
"""
from flask import Response
from flask.json import dumps
class BaseResponse(Response):
def __init__(self, data=None, error_code=0, error_message='Success', *args, **kwargs):
if data is not None:
result = dumps(dict(data=data, error_code=error_code, error_message=error_message, *args, **kwargs))
else:
result = dumps(dict(error_code=error_code, error_message=error_message, *args, **kwargs))
Response.__init__(self, result, mimetype='application/json')
......@@ -3,8 +3,7 @@ import logging
from flask_log_request_id import RequestIDLogFilter
class MyLogger(object):
def init_app(self, app):
def set_logger():
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(
......@@ -12,5 +11,5 @@ class MyLogger(object):
"%(asctime)s - %(filename)s - %(funcName)s -[line:%(lineno)d] - %(levelname)s - request_id=%(request_id)s: %(message)s",
"%Y-%m-%d %H:%M:%S"))
console_handler.addFilter(RequestIDLogFilter())
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().addHandler(console_handler)
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@time: 2021/04/27
@file: return_code.py
@function:
@modify:
"""
from flask import Response
from flask.json import dumps
class BASE_RESPONSE(Response):
def __init__(self, data=None, error_code=0, error_message='Success'):
if data:
result = dumps(dict(data=data, error_code=error_code, error_message=error_message))
else:
result = dumps(dict(error_code=error_code, error_message=error_message))
Response.__init__(self, result, mimetype='application/json')
SQL_ERROR = {
"error_code": 9001,
"error_message": "SQL execute error"
}
PARAMS_INVALID = {
"error_code": 1001,
"error_message": "params invalid"
}
DUPLICATE_RECORD = {
"error_code": 1002,
"error_message": "duplicate record"
}
......@@ -7,7 +7,7 @@ socket = 127.0.0.1:5051
chdir = /data/www/bestour
# python 启动程序文件
wsgi-file = app.py
wsgi-file = rent_app.py
# python 程序内用以启动的 application 变量名
callable = app
......@@ -18,4 +18,4 @@ processes = 1
# 线程数
threads = 2
daemonize = /data/www/bestour/server.log
;daemonize = /data/www/bestour/server.log
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment