Commit cf30e837 by Aeolus

update

parent 87905c8c
#!/usr/bin/.env python
from __future__ import print_function
import json
import logging
import os
import sys
import time
import requests
from flask import Flask, render_template, session, request, jsonify
from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room, \
close_room, rooms, disconnect
from smartcard.scard import *
import smartcard.util
import engineio.async_drivers.gevent
import engineio.async_drivers.threading
# import engineio.async_drivers.eventlet
# logging.basicConfig(level=logging.DEBUG, # 控制台打印的日志级别
# filename='nfc.log',
# filemode='a', ##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
# # a是追加模式,默认如果不写的话,就是追加模式
# format=
# '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
# # 日志格式
# )
logger = logging.getLogger(__name__)
# async_mode = 'eventlet'
async_mode = 'gevent'
# async_mode = 'threading'
# if getattr(sys, 'frozen', False):
# template_folder = os.path.join(sys._MEIPASS, 'templates')
# static_folder = os.path.join(sys._MEIPASS, 'static')
# app = Flask(__name__, template_folder=template_folder, static_folder=static_folder)
# else:
app = Flask(__name__)
socketio = SocketIO(app, async_mode=async_mode, cors_allowed_origins="*",
# logger=True,
# engineio_logger=True
)
class ScardService(object):
def __init__(self, reader=None):
if reader is None:
self.reader = "ACS ACR1281 1S Dual Reader PICC 0"
else:
self.reader = reader
def get_reader_state(self, state):
reader, eventstate, atr = state
# logger.info(reader + " " + smartcard.util.toHexString(atr, smartcard.util.HEX))
if eventstate & SCARD_STATE_EMPTY:
message = 'Reader empty'
# logger.info(message)
return 0, message
if eventstate & SCARD_STATE_PRESENT:
message = 'Card present in reader'
# logger.info(message)
return 1, message
if eventstate & SCARD_STATE_ATRMATCH:
message = 'Card found'
# logger.info(message)
return 2, message
if eventstate & SCARD_STATE_UNAWARE:
message = 'State unware'
# logger.info(message)
return 3, message
if eventstate & SCARD_STATE_IGNORE:
message = 'Ignore reader'
# logger.info(message)
return 4, message
if eventstate & SCARD_STATE_UNAVAILABLE:
message = 'Reader unavailable'
# logger.info(message)
return 5, message
if eventstate & SCARD_STATE_EXCLUSIVE:
message = 'Card allocated for exclusive use by another application'
# logger.info(message)
return 6, message
if eventstate & SCARD_STATE_INUSE:
message = 'Card in used by another application but can be shared'
# logger.info(message)
return 7, message
if eventstate & SCARD_STATE_MUTE:
message = 'Card is mute'
# logger.info(message)
return 8, message
if eventstate & SCARD_STATE_CHANGED:
message = 'State changed'
# logger.info(message)
return 9, message
if eventstate & SCARD_STATE_UNKNOWN:
message = 'State unknowned'
# logger.info(message)
return 10, message
return False
def establish_context(self):
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
# logger.info('Context established!')
return hcontext
def release_context(self, hcontext):
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + SCardGetErrorMessage(hresult))
# logger.info('Released context.')
def find_self_reader(self, hcontext):
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
if self.reader not in readers:
raise error('can not find reader ==> ' + self.reader)
return True
def handle_status_change(self, hcontext, readerstates):
try:
hresult, readerstates = SCardGetStatusChange(hcontext, INFINITE, readerstates)
rs_code, message = self.get_reader_state(readerstates[0])
if rs_code == 0:
# logger.info("请放入设备")
return
elif rs_code == 1:
self.card_transmit(hcontext, self.reader)
else:
# logger.info("未知错位,请重试")
exit(1)
except error as e:
pass
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + SCardGetErrorMessage(hresult))
# print('Released context.')
def my_scard_handler(self):
try:
hresult, hcontext = SCardEstablishContext(SCARD_SCOPE_USER)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to establish context: ' + SCardGetErrorMessage(hresult))
# print('Context established!')
try:
# 获取读卡器列表,筛选PICC类型是否存在
hresult, readers = SCardListReaders(hcontext, [])
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to list readers: ' + SCardGetErrorMessage(hresult))
if self.reader not in readers:
raise error('can not find reader ==> ' + self.reader)
readerstates = [(self.reader, SCARD_STATE_UNAWARE)]
while True:
hresult, readerstates = SCardGetStatusChange(hcontext, INFINITE, readerstates)
rs_code, message = self.get_reader_state(readerstates[0])
if rs_code == 0:
# logger.info("请放入设备")
continue
elif rs_code == 1:
self.card_transmit(hcontext, self.reader)
else:
# logger.info("未知错位,请重试")
exit(1)
finally:
hresult = SCardReleaseContext(hcontext)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to release context: ' + \
SCardGetErrorMessage(hresult))
# print('Released context.')
except error as e:
pass
def card_transmit(self, hcontext, zreader, command):
GET_RESPONSE = [0xA0, 0xC0, 0x00, 0x00]
# print('Trying to select DF_TELECOM of card in', zreader)
try:
hresult, hcard, dwActiveProtocol = SCardConnect(
hcontext,
zreader,
SCARD_SHARE_SHARED, SCARD_PROTOCOL_T0 | SCARD_PROTOCOL_T1)
if hresult != SCARD_S_SUCCESS:
raise error(
'Unable to connect: ' +
SCardGetErrorMessage(hresult))
# print('Connected with active protocol', dwActiveProtocol)
try:
hresult, response = SCardTransmit(
hcard, dwActiveProtocol, command)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to transmit: ' +
SCardGetErrorMessage(hresult))
return False
# print('Selected DF_TELECOM: ' +
# smartcard.util.toHexString(
# response, smartcard.util.HEX))
return response
finally:
hresult = SCardDisconnect(hcard, SCARD_UNPOWER_CARD)
if hresult != SCARD_S_SUCCESS:
raise error(
'Failed to disconnect: ' +
SCardGetErrorMessage(hresult))
# print('Disconnected')
except error as message:
pass
# print(error, message)
@app.route("/", methods=["GET"])
def index():
return jsonify("my_response", {'data': 'index'})
@app.route('/sendCode', methods=['GET', 'POST'])
def send_code():
json_data = request.get_json()
phone = json_data['phone']
url = "https://guide.ssw-htzn.com/business_web/account/sendCode"
headers = {
"platform": "business_web"
}
data = {"phone": phone}
# json_data = json.dumps(data, ensure_ascii=False)
# result = requests.post(url=url, data=json_data.encode('utf-8'), headers=headers, verify=None)
result = requests.post(url=url, json=data, headers=headers)
result = json.loads(result.text)
if int(result["error_code"]) != 0:
return jsonify({"error_code": -1, "error_message": "send message error"})
return jsonify({"error_code": 0, "error_message": "send message success"})
@app.route('/login', methods=['GET', 'POST'])
def run_login():
token = request.headers.get('Authorization')
json_data = request.get_json()
data = {}
phone = json_data['phone'] if 'phone' in json_data else None
code = json_data['code'] if 'code' in json_data else None
login_type = json_data['type'] if 'type' in json_data else 1 # 1.验证码登录,2.密码登录
url = "https://guide.ssw-htzn.com/business_web/account/login"
headers = {
"platform": "business_web"
}
if token:
headers.update({"Authorization": token})
data = {"code": code, "phone": phone, "type": login_type}
result = requests.post(url=url, json=data, headers=headers)
result = json.loads(result.text)
if int(result["error_code"]) != 0:
return jsonify({"error_code": -1, "error_message": "login error"})
data = result["data"]
return jsonify({"data": data, "error_code": 0, "error_message": "send message success"})
class MyNamespace(Namespace):
def __init__(self, namespace=None):
super(Namespace, self).__init__(namespace)
self.scard = ScardService()
def on_send_code(self, message):
json_data = message
phone = json_data['phone']
url = "https://guide.ssw-htzn.com/business_web/account/sendCode"
headers = {
"platform": "business_web"
}
data = {"phone": phone}
# json_data = json.dumps(data, ensure_ascii=False)
# result = requests.post(url=url, data=json_data.encode('utf-8'), headers=headers, verify=None)
result = requests.post(url=url, json=data, headers=headers)
result = json.loads(result.text)
if int(result["error_code"]) != 0:
emit('send_code', {"error_code": -1, "error_message": "send message error"})
emit('send_code', {"error_code": 0, "error_message": "send message success"})
def on_login(self, message):
json_data = message
token = json_data.get('Authorization')
phone = json_data['phone'] if 'phone' in json_data else None
code = json_data['code'] if 'code' in json_data else None
login_type = json_data['type'] if 'type' in json_data else 1 # 1.验证码登录,2.密码登录
url = "https://guide.ssw-htzn.com/business_web/account/login"
headers = {
"platform": "business_web"
}
if token:
headers.update({"Authorization": token})
data = {"code": code, "phone": phone, "type": login_type}
result = requests.post(url=url, json=data, headers=headers)
result = json.loads(result.text)
if int(result["error_code"]) != 0:
emit('login', {"error_code": -1, "error_message": "login error"})
else:
data = result["data"]
emit('login', {"data": data, "error_code": 0, "error_message": "send message success"})
def on_reader_connect(self, message):
try:
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if result:
emit('reader_connect', {'data': True, 'error_code': 0, 'error_message': 'success'})
else:
emit('reader_connect',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
except error as e:
# logger.info(e)
pass
finally:
hresult = self.scard.release_context(hcontext)
# logger.info('Released context.')
def on_read_history(self, message):
try:
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('read_history',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
history_data = []
for i in range(5):
command = [0xFF, 0xB0, 0x00, 10 + i, 0x10]
result = self.scard.card_transmit(hcontext, self.scard.reader, command)
if result:
data = result[:16]
return_code = result[-2:]
if data[0] == 0:
history_data.append({})
else:
data = [str(i) for i in data]
repair_date = ''.join(data[:8])
repair_content = int(''.join(data[8:10]))
repair_man = int(''.join(data[10:12]))
history_data.append(
{'repair_date': repair_date, 'repair_content': repair_content, 'repair_man': repair_man})
else:
emit('read_history',
{'data': None, 'error_code': 1002, 'error_message': '读取历史数据失败连接错误, reader connect error'})
emit('read_history',
{'data': history_data, 'error_code': 0, 'error_message': 'success'})
except error as e:
# logger.info(e)
pass
finally:
hresult = self.scard.release_context(hcontext)
# logger.info('Released context.')
def on_write_repair(self, message):
try:
repair_date = message["repair_date"]
repair_content = str(message["repair_content"]).zfill(2)
repair_man = str(message["repair_man"]).zfill(2)
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('write_repair',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
for i in range(5):
read_command = [0xFF, 0xB0, 0x00, 10 + i, 0x10]
result = self.scard.card_transmit(hcontext, self.scard.reader, read_command)
if result:
data = result[:16]
return_code = result[-2:]
if data[0] == 0:
write_command = [0xFF, 0xD6, 0x00, 10 + i, 0x10]
write_command += [int(x) for x in repair_date]
write_command += [int(x) for x in repair_content]
write_command += [int(x) for x in repair_man]
write_command += [0, 0, 0, 0]
result = self.scard.card_transmit(hcontext, self.scard.reader, write_command)
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
emit('write_repair',
{'data': None, 'error_code': 0, 'error_message': 'success'})
return
else:
emit('write_repair',
{'data': None, 'error_code': 1003,
'error_message': '写入维修数据失败,请重试, write repair data error'})
break
emit('write_repair',
{'data': None, 'error_code': 0, 'error_message': 'success'})
except error as e:
# logger.info(e)
pass
finally:
hresult = self.scard.release_context(hcontext)
# logger.info('Released context.')
def on_read_sn(self, message):
try:
sn = ""
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('read_sn',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
read_command = [0xFF, 0xB0, 0x00, 17, 0x10]
result = self.scard.card_transmit(hcontext, self.scard.reader, read_command)
if result:
data = result[:16]
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
sn = [hex(i).replace('0x', '').upper() for i in data]
sn = ''.join(sn)
sn = '07' + sn
else:
emit('read_sn',
{'data': None, 'error_code': 1003,
'error_message': '写入维修数据失败,请重试, write repair data error'})
emit('read_sn',
{'data': sn, 'error_code': 0, 'error_message': 'success'})
except error as e:
# logger.info(e)
pass
finally:
hresult = self.scard.release_context(hcontext)
# logger.info('Released context.')
def on_write_sn(self, message):
try:
sn = message["sn"]
token = message["Authorization"]
last_four = hex(int(sn[-4:])).replace('0x', '').zfill(4)
sn = sn[:-4] + last_four
# TODO 校验sn是否重复
sn = sn[2:]
power_no = sn[4:]
url = "https://guide.ssw-htzn.com/business_web/power/check_write_power_no"
headers = {
"Authorization": token,
"platform": "business_web"
}
data = {"power_no": sn}
# json_data = json.dumps(data, ensure_ascii=False)
# result = requests.post(url=url, data=json_data.encode('utf-8'), headers=headers, verify=None)
result = requests.post(url=url, json=data, headers=headers)
# print(result.text)
result = json.loads(result.text)
if int(result["error_code"]) != 0:
emit('write_sn',
{'data': None, 'error_code': 1005,
'error_message': 'sn数据重复,请重试, sn exist!'})
return
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('write_sn',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
write_command = [0xFF, 0xD6, 0x00, 17, 0x10]
write_command += [int(i, 16) for i in sn]
result = self.scard.card_transmit(hcontext, self.scard.reader, write_command)
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
pass
else:
emit('write_sn',
{'data': None, 'error_code': 1004,
'error_message': '写入sn数据失败,请重试, write repair data error'})
return
emit('write_sn',
{'data': None, 'error_code': 0, 'error_message': 'success'})
except error as e:
# logger.info(e)
pass
finally:
hresult = self.scard.release_context(hcontext)
# logger.info('Released context.')
def on_delete_all(self, message):
try:
password = message["password"]
if password != "sswnb":
emit('delete_all',
{'data': None, 'error_code': 1008,
'error_message': 'password error ,密码错误,请重试'})
return
hcontext = self.scard.establish_context()
result = self.scard.find_self_reader(hcontext)
if not result:
emit('delete_all',
{'data': None, 'error_code': 1001, 'error_message': '读卡器连接失败, reader connect error'})
return
# 写入全为0的sn
write_command = [0xFF, 0xD6, 0x00, 17, 0x10]
write_command += [0 for i in range(16)]
result = self.scard.card_transmit(hcontext, self.scard.reader, write_command)
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
pass
else:
emit('delete_all',
{'data': None, 'error_code': 1007,
'error_message': '删除sn数据失败,请重试, write repair data error'})
return
# 写入全为0的维修数据
for i in range(5):
write_command = [0xFF, 0xD6, 0x00, 10 + i, 0x10]
write_command += [0 for x in range(16)]
result = self.scard.card_transmit(hcontext, self.scard.reader, write_command)
return_code = result[-2:]
if return_code[0] == 144 and return_code[1] == 0:
pass
else:
emit('delete_all',
{'data': None, 'error_code': 1006,
'error_message': '删除维修数据失败,请重试, write repair data error'})
return
emit('delete_all',
{'data': None, 'error_code': 0, 'error_message': 'success'})
except error as e:
# logger.info(e)
emit('delete_all',
{'data': None, 'error_code': 1009,
'error_message': 'unkwon error, 删除数据,未知错误'})
finally:
hresult = self.scard.release_context(hcontext)
# logger.info('Released context.')
# 广播事件,接收消息后广播
def on_my_broadcast_event(self, message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': message['data'], 'count': session['receive_count']},
broadcast=True)
def on_ping(self):
emit('my_pong')
def on_connect(self):
emit('my_response', {'data': 'Connected'})
def on_disconnect(self):
emit('my_response', {'data': 'Connected'})
# print('Client disconnected', request.sid)
name_space = MyNamespace()
socketio.on_namespace(name_space)
if __name__ == '__main__':
print('程序启动中....')
time.sleep(1)
print('程序已启动....')
socketio.run(app, host='127.0.0.1', port=5000)
Flask == 2.0.1
Flask-Cors == 3.0.10
Flask-Log-Request-ID == 0.10.1
python-dotenv == 0.19.0
Flask-SocketIO == 5.1.1
pyscard == 2.0.1
flask-redis == 0.4.0
PyJWT == 2.1.0
simple-websocket == 0.3.0
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment