Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
nfc_storage
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
冯佳佳
nfc_storage
Commits
b7494619
Commit
b7494619
authored
Oct 14, 2021
by
Aeolus
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
6f3499e2
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
701 additions
and
10 deletions
+701
-10
app.py
+444
-0
myapps/nfc/repair_tool/socket_server/app.py
+196
-4
myapps/nfc/socket_server/example.py
+7
-1
myapps/nfc/socket_server/socketio_client.py
+31
-3
poetry.lock
+19
-1
pyproject.toml
+2
-0
utils/jwt_util.py
+2
-1
No files found.
app.py
0 → 100644
View file @
b7494619
#!/usr/bin/.env python
from
__future__
import
print_function
import
json
import
logging
import
os
import
sys
from
threading
import
Lock
import
requests
from
flask
import
Flask
,
render_template
,
session
,
request
from
flask_socketio
import
SocketIO
,
Namespace
,
emit
,
join_room
,
leave_room
,
\
close_room
,
rooms
,
disconnect
# Set this variable to "threading", "eventlet" or "gevent" to test the
# different async modes, or leave it set to None for the application to choose
# the best option based on installed packages.
# from myapps.nfc.repair_tool.service.scard_service import ScardService
from
myapps.nfc.repair_tool.socket_server
import
create_app
from
smartcard.scard
import
*
import
smartcard.util
from
smartcard.scard
import
error
as
scard_error
,
SCARD_S_SUCCESS
,
SCardGetErrorMessage
logger
=
logging
.
getLogger
(
__name__
)
async_mode
=
'threading'
# app = create_app(os.getenv('RUN_ENV', 'prod'))
if
getattr
(
sys
,
'frozen'
,
False
):
template_folder
=
os
.
path
.
join
(
sys
.
_MEIPASS
,
'templates'
)
static_folder
=
os
.
path
.
join
(
sys
.
_MEIPASS
,
'static'
)
app
=
Flask
(
__name__
,
template_folder
=
template_folder
,
static_folder
=
static_folder
)
else
:
app
=
Flask
(
__name__
)
socketio
=
SocketIO
(
app
,
cors_allowed_origins
=
"*"
,
logger
=
True
,
engineio_logger
=
True
)
thread
=
None
thread_lock
=
Lock
()
class
ScardService
(
object
):
def
__init__
(
self
,
reader
=
None
):
if
reader
is
None
:
self
.
reader
=
"ACS ACR1281 1S Dual Reader PICC 0"
else
:
self
.
reader
=
reader
def
get_reader_state
(
self
,
state
):
reader
,
eventstate
,
atr
=
state
logger
.
info
(
reader
+
" "
+
smartcard
.
util
.
toHexString
(
atr
,
smartcard
.
util
.
HEX
))
if
eventstate
&
SCARD_STATE_EMPTY
:
message
=
'Reader empty'
logger
.
info
(
message
)
return
0
,
message
if
eventstate
&
SCARD_STATE_PRESENT
:
message
=
'Card present in reader'
logger
.
info
(
message
)
return
1
,
message
if
eventstate
&
SCARD_STATE_ATRMATCH
:
message
=
'Card found'
logger
.
info
(
message
)
return
2
,
message
if
eventstate
&
SCARD_STATE_UNAWARE
:
message
=
'State unware'
logger
.
info
(
message
)
return
3
,
message
if
eventstate
&
SCARD_STATE_IGNORE
:
message
=
'Ignore reader'
logger
.
info
(
message
)
return
4
,
message
if
eventstate
&
SCARD_STATE_UNAVAILABLE
:
message
=
'Reader unavailable'
logger
.
info
(
message
)
return
5
,
message
if
eventstate
&
SCARD_STATE_EXCLUSIVE
:
message
=
'Card allocated for exclusive use by another application'
logger
.
info
(
message
)
return
6
,
message
if
eventstate
&
SCARD_STATE_INUSE
:
message
=
'Card in used by another application but can be shared'
logger
.
info
(
message
)
return
7
,
message
if
eventstate
&
SCARD_STATE_MUTE
:
message
=
'Card is mute'
logger
.
info
(
message
)
return
8
,
message
if
eventstate
&
SCARD_STATE_CHANGED
:
message
=
'State changed'
logger
.
info
(
message
)
return
9
,
message
if
eventstate
&
SCARD_STATE_UNKNOWN
:
message
=
'State unknowned'
logger
.
info
(
message
)
return
10
,
message
return
False
def
establish_context
(
self
):
hresult
,
hcontext
=
SCardEstablishContext
(
SCARD_SCOPE_USER
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to establish context: '
+
SCardGetErrorMessage
(
hresult
))
logger
.
info
(
'Context established!'
)
return
hcontext
def
release_context
(
self
,
hcontext
):
hresult
=
SCardReleaseContext
(
hcontext
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to release context: '
+
SCardGetErrorMessage
(
hresult
))
logger
.
info
(
'Released context.'
)
def
find_self_reader
(
self
,
hcontext
):
hresult
,
readers
=
SCardListReaders
(
hcontext
,
[])
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to list readers: '
+
SCardGetErrorMessage
(
hresult
))
if
self
.
reader
not
in
readers
:
raise
error
(
'can not find reader ==> '
+
self
.
reader
)
return
True
def
handle_status_change
(
self
,
hcontext
,
readerstates
):
try
:
hresult
,
readerstates
=
SCardGetStatusChange
(
hcontext
,
INFINITE
,
readerstates
)
rs_code
,
message
=
self
.
get_reader_state
(
readerstates
[
0
])
if
rs_code
==
0
:
logger
.
info
(
"请放入设备"
)
return
elif
rs_code
==
1
:
self
.
card_transmit
(
hcontext
,
self
.
reader
)
else
:
logger
.
info
(
"未知错位,请重试"
)
exit
(
1
)
except
error
as
e
:
print
(
e
)
finally
:
hresult
=
SCardReleaseContext
(
hcontext
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to release context: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Released context.'
)
def
my_scard_handler
(
self
):
try
:
hresult
,
hcontext
=
SCardEstablishContext
(
SCARD_SCOPE_USER
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to establish context: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Context established!'
)
try
:
# 获取读卡器列表,筛选PICC类型是否存在
hresult
,
readers
=
SCardListReaders
(
hcontext
,
[])
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to list readers: '
+
SCardGetErrorMessage
(
hresult
))
if
self
.
reader
not
in
readers
:
raise
error
(
'can not find reader ==> '
+
self
.
reader
)
readerstates
=
[(
self
.
reader
,
SCARD_STATE_UNAWARE
)]
while
True
:
hresult
,
readerstates
=
SCardGetStatusChange
(
hcontext
,
INFINITE
,
readerstates
)
rs_code
,
message
=
self
.
get_reader_state
(
readerstates
[
0
])
if
rs_code
==
0
:
logger
.
info
(
"请放入设备"
)
continue
elif
rs_code
==
1
:
self
.
card_transmit
(
hcontext
,
self
.
reader
)
else
:
logger
.
info
(
"未知错位,请重试"
)
exit
(
1
)
finally
:
hresult
=
SCardReleaseContext
(
hcontext
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to release context: '
+
\
SCardGetErrorMessage
(
hresult
))
print
(
'Released context.'
)
except
error
as
e
:
print
(
e
)
def
card_transmit
(
self
,
hcontext
,
zreader
,
command
):
GET_RESPONSE
=
[
0xA0
,
0xC0
,
0x00
,
0x00
]
print
(
'Trying to select DF_TELECOM of card in'
,
zreader
)
try
:
hresult
,
hcard
,
dwActiveProtocol
=
SCardConnect
(
hcontext
,
zreader
,
SCARD_SHARE_SHARED
,
SCARD_PROTOCOL_T0
|
SCARD_PROTOCOL_T1
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Unable to connect: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Connected with active protocol'
,
dwActiveProtocol
)
try
:
hresult
,
response
=
SCardTransmit
(
hcard
,
dwActiveProtocol
,
command
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to transmit: '
+
SCardGetErrorMessage
(
hresult
))
return
False
print
(
'Selected DF_TELECOM: '
+
smartcard
.
util
.
toHexString
(
response
,
smartcard
.
util
.
HEX
))
return
response
finally
:
hresult
=
SCardDisconnect
(
hcard
,
SCARD_UNPOWER_CARD
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to disconnect: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Disconnected'
)
except
error
as
message
:
print
(
error
,
message
)
@app.route
(
"/"
,
methods
=
[
"GET"
])
def
index
():
socketio
.
emit
(
"my_response"
,
{
'data'
:
'index'
})
def
background_thread
():
"""Example of how to send server generated events to clients."""
count
=
0
while
True
:
socketio
.
sleep
(
10
)
count
+=
1
socketio
.
emit
(
'my_response'
,
{
'data'
:
'Server generated event'
,
'count'
:
count
},
namespace
=
'/test'
)
class
MyNamespace
(
Namespace
):
def
__init__
(
self
,
namespace
=
None
):
super
(
Namespace
,
self
)
.
__init__
(
namespace
)
self
.
scard
=
ScardService
()
def
on_reader_connect
(
self
,
message
):
try
:
hcontext
=
self
.
scard
.
establish_context
()
result
=
self
.
scard
.
find_self_reader
(
hcontext
)
if
result
:
emit
(
'reader_connect'
,
{
'data'
:
True
,
'error_code'
:
0
,
'error_message'
:
'success'
})
else
:
emit
(
'reader_connect'
,
{
'data'
:
None
,
'error_code'
:
1001
,
'error_message'
:
'读卡器连接失败, reader connect error'
})
except
scard_error
as
e
:
logger
.
info
(
e
)
finally
:
hresult
=
self
.
scard
.
release_context
(
hcontext
)
logger
.
info
(
'Released context.'
)
def
on_read_history
(
self
,
message
):
try
:
hcontext
=
self
.
scard
.
establish_context
()
result
=
self
.
scard
.
find_self_reader
(
hcontext
)
if
not
result
:
emit
(
'read_history'
,
{
'data'
:
None
,
'error_code'
:
1001
,
'error_message'
:
'读卡器连接失败, reader connect error'
})
history_data
=
[]
for
i
in
range
(
5
):
command
=
[
0xFF
,
0xB0
,
0x00
,
10
+
i
,
0x10
]
result
=
self
.
scard
.
card_transmit
(
hcontext
,
self
.
scard
.
reader
,
command
)
if
result
:
data
=
result
[:
16
]
return_code
=
result
[
-
2
:]
if
data
[
0
]
==
0
:
history_data
.
append
({})
else
:
data
=
[
str
(
i
)
for
i
in
data
]
repair_date
=
''
.
join
(
data
[:
8
])
repair_content
=
int
(
''
.
join
(
data
[
8
:
10
]))
repair_man
=
int
(
''
.
join
(
data
[
10
:
12
]))
history_data
.
append
(
{
'repair_date'
:
repair_date
,
'repair_content'
:
repair_content
,
'repair_man'
:
repair_man
})
else
:
emit
(
'read_history'
,
{
'data'
:
None
,
'error_code'
:
1002
,
'error_message'
:
'读取历史数据失败连接错误, reader connect error'
})
emit
(
'read_history'
,
{
'data'
:
history_data
,
'error_code'
:
0
,
'error_message'
:
'success'
})
except
scard_error
as
e
:
logger
.
info
(
e
)
finally
:
hresult
=
self
.
scard
.
release_context
(
hcontext
)
logger
.
info
(
'Released context.'
)
def
on_write_repair
(
self
,
message
):
try
:
repair_date
=
message
[
"repair_date"
]
repair_content
=
str
(
message
[
"repair_content"
])
.
zfill
(
2
)
repair_man
=
str
(
message
[
"repair_man"
])
.
zfill
(
2
)
hcontext
=
self
.
scard
.
establish_context
()
result
=
self
.
scard
.
find_self_reader
(
hcontext
)
if
not
result
:
emit
(
'write_repair'
,
{
'data'
:
None
,
'error_code'
:
1001
,
'error_message'
:
'读卡器连接失败, reader connect error'
})
history_data
=
[]
for
i
in
range
(
5
):
read_command
=
[
0xFF
,
0xB0
,
0x00
,
10
+
i
,
0x10
]
result
=
self
.
scard
.
card_transmit
(
hcontext
,
self
.
scard
.
reader
,
read_command
)
if
result
:
data
=
result
[:
16
]
return_code
=
result
[
-
2
:]
if
data
[
0
]
==
0
:
write_command
=
[
0xFF
,
0xD6
,
0x00
,
10
+
i
,
0x10
]
write_command
+=
[
int
(
x
)
for
x
in
repair_date
]
write_command
+=
[
int
(
x
)
for
x
in
repair_content
]
write_command
+=
[
int
(
x
)
for
x
in
repair_man
]
write_command
+=
[
0
,
0
,
0
,
0
]
result
=
self
.
scard
.
card_transmit
(
hcontext
,
self
.
scard
.
reader
,
write_command
)
return_code
=
result
[
-
2
:]
if
return_code
[
0
]
==
144
and
return_code
[
1
]
==
0
:
pass
else
:
emit
(
'write_repair'
,
{
'data'
:
None
,
'error_code'
:
1003
,
'error_message'
:
'写入维修数据失败,请重试, write repair data error'
})
break
emit
(
'write_repair'
,
{
'data'
:
history_data
,
'error_code'
:
0
,
'error_message'
:
'success'
})
except
scard_error
as
e
:
logger
.
info
(
e
)
finally
:
hresult
=
self
.
scard
.
release_context
(
hcontext
)
logger
.
info
(
'Released context.'
)
def
on_read_sn
(
self
,
message
):
try
:
hcontext
=
self
.
scard
.
establish_context
()
result
=
self
.
scard
.
find_self_reader
(
hcontext
)
if
not
result
:
emit
(
'read_sn'
,
{
'data'
:
None
,
'error_code'
:
1001
,
'error_message'
:
'读卡器连接失败, reader connect error'
})
read_command
=
[
0xFF
,
0xB0
,
0x00
,
17
,
0x10
]
result
=
self
.
scard
.
card_transmit
(
hcontext
,
self
.
scard
.
reader
,
read_command
)
if
result
:
data
=
result
[:
16
]
return_code
=
result
[
-
2
:]
if
return_code
[
0
]
==
144
and
return_code
[
1
]
==
0
:
sn
=
[
hex
(
i
)
.
replace
(
'0x'
,
''
)
.
upper
()
for
i
in
data
]
sn
=
''
.
join
(
sn
)
sn
=
'07'
+
sn
else
:
emit
(
'read_sn'
,
{
'data'
:
None
,
'error_code'
:
1003
,
'error_message'
:
'写入维修数据失败,请重试, write repair data error'
})
emit
(
'read_sn'
,
{
'data'
:
sn
,
'error_code'
:
0
,
'error_message'
:
'success'
})
except
scard_error
as
e
:
logger
.
info
(
e
)
finally
:
hresult
=
self
.
scard
.
release_context
(
hcontext
)
logger
.
info
(
'Released context.'
)
def
on_write_sn
(
self
,
message
):
try
:
sn
=
message
[
"sn"
]
token
=
message
[
"Authorization"
]
last_four
=
hex
(
int
(
sn
[
-
4
:]))
.
replace
(
'0x'
,
''
)
.
zfill
(
4
)
sn
=
sn
[:
-
4
]
+
last_four
# TODO 校验sn是否重复
sn
=
sn
[
2
:]
power_no
=
sn
[
4
:]
url
=
"https://guide.ssw-htzn.com/business_web/power/check_write_power_no"
headers
=
{
"Authorization"
:
token
,
"platform"
:
"business_web"
}
data
=
{
"power_no"
:
sn
}
# json_data = json.dumps(data, ensure_ascii=False)
# result = requests.post(url=url, data=json_data.encode('utf-8'), headers=headers, verify=None)
result
=
requests
.
post
(
url
=
url
,
json
=
data
,
headers
=
headers
)
print
(
result
.
text
)
result
=
json
.
loads
(
result
.
text
)
if
int
(
result
[
"error_code"
])
!=
0
:
emit
(
'write_sn'
,
{
'data'
:
None
,
'error_code'
:
1005
,
'error_message'
:
'sn数据重复,请重试, sn exist!'
})
hcontext
=
self
.
scard
.
establish_context
()
result
=
self
.
scard
.
find_self_reader
(
hcontext
)
if
not
result
:
emit
(
'write_sn'
,
{
'data'
:
None
,
'error_code'
:
1001
,
'error_message'
:
'读卡器连接失败, reader connect error'
})
write_command
=
[
0xFF
,
0xD6
,
0x00
,
17
,
0x10
]
write_command
+=
[
int
(
i
,
16
)
for
i
in
sn
]
result
=
self
.
scard
.
card_transmit
(
hcontext
,
self
.
scard
.
reader
,
write_command
)
return_code
=
result
[
-
2
:]
if
return_code
[
0
]
==
144
and
return_code
[
1
]
==
0
:
pass
else
:
emit
(
'write_sn'
,
{
'data'
:
None
,
'error_code'
:
1004
,
'error_message'
:
'写入sn数据失败,请重试, write repair data error'
})
emit
(
'write_sn'
,
{
'data'
:
None
,
'error_code'
:
0
,
'error_message'
:
'success'
})
except
scard_error
as
e
:
logger
.
info
(
e
)
finally
:
hresult
=
self
.
scard
.
release_context
(
hcontext
)
logger
.
info
(
'Released context.'
)
# 广播事件,接收消息后广播
def
on_my_broadcast_event
(
self
,
message
):
session
[
'receive_count'
]
=
session
.
get
(
'receive_count'
,
0
)
+
1
emit
(
'my_response'
,
{
'data'
:
message
[
'data'
],
'count'
:
session
[
'receive_count'
]},
broadcast
=
True
)
def
on_ping
(
self
):
emit
(
'my_pong'
)
def
on_connect
(
self
):
emit
(
'my_response'
,
{
'data'
:
'Connected'
})
def
on_disconnect
(
self
):
print
(
'Client disconnected'
,
request
.
sid
)
name_space
=
MyNamespace
()
socketio
.
on_namespace
(
name_space
)
if
__name__
==
'__main__'
:
socketio
.
run
(
app
)
myapps/nfc/repair_tool/socket_server/app.py
View file @
b7494619
#!/usr/bin/.env python
#!/usr/bin/.env python
from
__future__
import
print_function
import
json
import
json
import
logging
import
logging
import
os
import
os
...
@@ -12,20 +13,210 @@ from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room, \
...
@@ -12,20 +13,210 @@ from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room, \
# Set this variable to "threading", "eventlet" or "gevent" to test the
# Set this variable to "threading", "eventlet" or "gevent" to test the
# different async modes, or leave it set to None for the application to choose
# different async modes, or leave it set to None for the application to choose
# the best option based on installed packages.
# the best option based on installed packages.
from
myapps.nfc.repair_tool.service.scard_service
import
ScardService
#
from myapps.nfc.repair_tool.service.scard_service import ScardService
from
myapps.nfc.repair_tool.socket_server
import
create_app
from
myapps.nfc.repair_tool.socket_server
import
create_app
from
smartcard.scard
import
*
import
smartcard.util
from
smartcard.scard
import
error
as
scard_error
,
SCARD_S_SUCCESS
,
SCardGetErrorMessage
from
smartcard.scard
import
error
as
scard_error
,
SCARD_S_SUCCESS
,
SCardGetErrorMessage
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
async_mode
=
'threading'
async_mode
=
'threading'
app
=
create_app
(
os
.
getenv
(
'RUN_ENV'
,
'prod'
))
# app = create_app(os.getenv('RUN_ENV', 'prod'))
app
=
Flask
(
__name__
)
socketio
=
SocketIO
(
app
,
cors_allowed_origins
=
"*"
,
logger
=
True
,
engineio_logger
=
True
)
socketio
=
SocketIO
(
app
,
cors_allowed_origins
=
"*"
,
logger
=
True
,
engineio_logger
=
True
)
thread
=
None
thread
=
None
thread_lock
=
Lock
()
thread_lock
=
Lock
()
class
ScardService
(
object
):
def
__init__
(
self
,
reader
=
None
):
if
reader
is
None
:
self
.
reader
=
"ACS ACR1281 1S Dual Reader PICC 0"
else
:
self
.
reader
=
reader
def
get_reader_state
(
self
,
state
):
reader
,
eventstate
,
atr
=
state
logger
.
info
(
reader
+
" "
+
smartcard
.
util
.
toHexString
(
atr
,
smartcard
.
util
.
HEX
))
if
eventstate
&
SCARD_STATE_EMPTY
:
message
=
'Reader empty'
logger
.
info
(
message
)
return
0
,
message
if
eventstate
&
SCARD_STATE_PRESENT
:
message
=
'Card present in reader'
logger
.
info
(
message
)
return
1
,
message
if
eventstate
&
SCARD_STATE_ATRMATCH
:
message
=
'Card found'
logger
.
info
(
message
)
return
2
,
message
if
eventstate
&
SCARD_STATE_UNAWARE
:
message
=
'State unware'
logger
.
info
(
message
)
return
3
,
message
if
eventstate
&
SCARD_STATE_IGNORE
:
message
=
'Ignore reader'
logger
.
info
(
message
)
return
4
,
message
if
eventstate
&
SCARD_STATE_UNAVAILABLE
:
message
=
'Reader unavailable'
logger
.
info
(
message
)
return
5
,
message
if
eventstate
&
SCARD_STATE_EXCLUSIVE
:
message
=
'Card allocated for exclusive use by another application'
logger
.
info
(
message
)
return
6
,
message
if
eventstate
&
SCARD_STATE_INUSE
:
message
=
'Card in used by another application but can be shared'
logger
.
info
(
message
)
return
7
,
message
if
eventstate
&
SCARD_STATE_MUTE
:
message
=
'Card is mute'
logger
.
info
(
message
)
return
8
,
message
if
eventstate
&
SCARD_STATE_CHANGED
:
message
=
'State changed'
logger
.
info
(
message
)
return
9
,
message
if
eventstate
&
SCARD_STATE_UNKNOWN
:
message
=
'State unknowned'
logger
.
info
(
message
)
return
10
,
message
return
False
def
establish_context
(
self
):
hresult
,
hcontext
=
SCardEstablishContext
(
SCARD_SCOPE_USER
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to establish context: '
+
SCardGetErrorMessage
(
hresult
))
logger
.
info
(
'Context established!'
)
return
hcontext
def
release_context
(
self
,
hcontext
):
hresult
=
SCardReleaseContext
(
hcontext
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to release context: '
+
SCardGetErrorMessage
(
hresult
))
logger
.
info
(
'Released context.'
)
def
find_self_reader
(
self
,
hcontext
):
hresult
,
readers
=
SCardListReaders
(
hcontext
,
[])
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to list readers: '
+
SCardGetErrorMessage
(
hresult
))
if
self
.
reader
not
in
readers
:
raise
error
(
'can not find reader ==> '
+
self
.
reader
)
return
True
def
handle_status_change
(
self
,
hcontext
,
readerstates
):
try
:
hresult
,
readerstates
=
SCardGetStatusChange
(
hcontext
,
INFINITE
,
readerstates
)
rs_code
,
message
=
self
.
get_reader_state
(
readerstates
[
0
])
if
rs_code
==
0
:
logger
.
info
(
"请放入设备"
)
return
elif
rs_code
==
1
:
self
.
card_transmit
(
hcontext
,
self
.
reader
)
else
:
logger
.
info
(
"未知错位,请重试"
)
exit
(
1
)
except
error
as
e
:
print
(
e
)
finally
:
hresult
=
SCardReleaseContext
(
hcontext
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to release context: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Released context.'
)
def
my_scard_handler
(
self
):
try
:
hresult
,
hcontext
=
SCardEstablishContext
(
SCARD_SCOPE_USER
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to establish context: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Context established!'
)
try
:
# 获取读卡器列表,筛选PICC类型是否存在
hresult
,
readers
=
SCardListReaders
(
hcontext
,
[])
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to list readers: '
+
SCardGetErrorMessage
(
hresult
))
if
self
.
reader
not
in
readers
:
raise
error
(
'can not find reader ==> '
+
self
.
reader
)
readerstates
=
[(
self
.
reader
,
SCARD_STATE_UNAWARE
)]
while
True
:
hresult
,
readerstates
=
SCardGetStatusChange
(
hcontext
,
INFINITE
,
readerstates
)
rs_code
,
message
=
self
.
get_reader_state
(
readerstates
[
0
])
if
rs_code
==
0
:
logger
.
info
(
"请放入设备"
)
continue
elif
rs_code
==
1
:
self
.
card_transmit
(
hcontext
,
self
.
reader
)
else
:
logger
.
info
(
"未知错位,请重试"
)
exit
(
1
)
finally
:
hresult
=
SCardReleaseContext
(
hcontext
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to release context: '
+
\
SCardGetErrorMessage
(
hresult
))
print
(
'Released context.'
)
except
error
as
e
:
print
(
e
)
def
card_transmit
(
self
,
hcontext
,
zreader
,
command
):
GET_RESPONSE
=
[
0xA0
,
0xC0
,
0x00
,
0x00
]
print
(
'Trying to select DF_TELECOM of card in'
,
zreader
)
try
:
hresult
,
hcard
,
dwActiveProtocol
=
SCardConnect
(
hcontext
,
zreader
,
SCARD_SHARE_SHARED
,
SCARD_PROTOCOL_T0
|
SCARD_PROTOCOL_T1
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Unable to connect: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Connected with active protocol'
,
dwActiveProtocol
)
try
:
hresult
,
response
=
SCardTransmit
(
hcard
,
dwActiveProtocol
,
command
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to transmit: '
+
SCardGetErrorMessage
(
hresult
))
return
False
print
(
'Selected DF_TELECOM: '
+
smartcard
.
util
.
toHexString
(
response
,
smartcard
.
util
.
HEX
))
return
response
finally
:
hresult
=
SCardDisconnect
(
hcard
,
SCARD_UNPOWER_CARD
)
if
hresult
!=
SCARD_S_SUCCESS
:
raise
error
(
'Failed to disconnect: '
+
SCardGetErrorMessage
(
hresult
))
print
(
'Disconnected'
)
except
error
as
message
:
print
(
error
,
message
)
@app.route
(
"/"
,
methods
=
[
"GET"
])
@app.route
(
"/"
,
methods
=
[
"GET"
])
def
index
():
def
index
():
socketio
.
emit
(
"my_response"
,
{
'data'
:
'index'
})
socketio
.
emit
(
"my_response"
,
{
'data'
:
'index'
})
...
@@ -187,8 +378,9 @@ class MyNamespace(Namespace):
...
@@ -187,8 +378,9 @@ class MyNamespace(Namespace):
}
}
data
=
{
"power_no"
:
sn
}
data
=
{
"power_no"
:
sn
}
json_data
=
json
.
dumps
(
data
,
ensure_ascii
=
False
)
# json_data = json.dumps(data, ensure_ascii=False)
result
=
requests
.
post
(
url
=
url
,
data
=
json_data
.
encode
(
'utf-8'
),
headers
=
headers
,
verify
=
None
)
# result = requests.post(url=url, data=json_data.encode('utf-8'), headers=headers, verify=None)
result
=
requests
.
post
(
url
=
url
,
json
=
data
,
headers
=
headers
)
print
(
result
.
text
)
print
(
result
.
text
)
result
=
json
.
loads
(
result
.
text
)
result
=
json
.
loads
(
result
.
text
)
if
int
(
result
[
"error_code"
])
!=
0
:
if
int
(
result
[
"error_code"
])
!=
0
:
...
...
myapps/nfc/socket_server/example.py
View file @
b7494619
from
flask
import
Flask
,
render_template
from
flask
import
Flask
,
render_template
,
request
from
flask_socketio
import
SocketIO
,
emit
from
flask_socketio
import
SocketIO
,
emit
app
=
Flask
(
__name__
)
app
=
Flask
(
__name__
)
...
@@ -11,6 +11,12 @@ socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=
...
@@ -11,6 +11,12 @@ socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=
def
index
():
def
index
():
return
render_template
(
'index.html'
)
return
render_template
(
'index.html'
)
@app.route
(
'/sendCode'
,
methods
=
[
'GET'
,
'POST'
])
def
send_code
():
json_data
=
request
.
get_json
()
phone
=
json_data
[
'phone'
]
return
render_template
(
'index.html'
)
# @socketio.event
# @socketio.event
# def my_event(message):
# def my_event(message):
...
...
myapps/nfc/socket_server/socketio_client.py
View file @
b7494619
...
@@ -8,6 +8,7 @@ author:Aeolus
...
@@ -8,6 +8,7 @@ author:Aeolus
@function:
@function:
@modify:
@modify:
"""
"""
import
time
import
socketio
import
socketio
...
@@ -52,18 +53,25 @@ def on_message(data):
...
@@ -52,18 +53,25 @@ def on_message(data):
print
(
'I received a write_sn!'
)
print
(
'I received a write_sn!'
)
print
(
data
)
print
(
data
)
@sio.on
(
'read_sn'
)
@sio.on
(
'read_sn'
)
def
on_message
(
data
):
def
on_message
(
data
):
print
(
'I received a read_sn!'
)
print
(
'I received a read_sn!'
)
print
(
data
)
print
(
data
)
@sio.on
(
'send_code'
)
def
on_message
(
data
):
print
(
'I received a send code!'
)
print
(
data
)
@sio.event
@sio.event
def
disconnect
():
def
disconnect
():
print
(
'disconnected from server'
)
print
(
'disconnected from server'
)
sio
.
connect
(
'http://127.0.0.1:5000'
,
auth
=
{
"token"
:
"1234"
}
)
sio
.
connect
(
'http://127.0.0.1:5000'
)
# sio.connect('http://127.0.0.1:5000')
# sio.connect('http://127.0.0.1:5000')
# sio.wait()
# sio.wait()
# # sio.emit('my_message', {'response': 'my response'})
# # sio.emit('my_message', {'response': 'my response'})
...
@@ -73,9 +81,29 @@ sio.connect('http://127.0.0.1:5000', auth={"token": "1234"})
...
@@ -73,9 +81,29 @@ sio.connect('http://127.0.0.1:5000', auth={"token": "1234"})
# sio.emit('write_repair', {'repair_date': '20210829', 'repair_content': 4, 'repair_man': 9})
# sio.emit('write_repair', {'repair_date': '20210829', 'repair_content': 4, 'repair_man': 9})
# sio.emit('write_repair', {'repair_date': '20210830', 'repair_content': 5, 'repair_man': 10})
# sio.emit('write_repair', {'repair_date': '20210830', 'repair_content': 5, 'repair_man': 10})
# sio.emit('read_history', {'repair_date': '20210826', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('read_history', {'repair_date': '20210826', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('write_sn', {'sn': '07E2E3070A01411104', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('write_sn', {
sio
.
emit
(
'read_sn'
,
{
'sn'
:
'07E2E3070A01411104'
,
'repair_content'
:
'01'
,
'repair_man'
:
'01'
})
# "Authorization": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MzE2OTM1ODcuMzMyMjU4MiwidXNlcl9pZCI6MTQ1fQ.IZT9cDYtjTRlkrIG_2Gbux0n07KzKGfad9-GNqcWwjk",
# "sn": "07E2E5090A03431231", "message": "写入sn"})
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# sio.emit('read_sn', {'data': '01', })
# for i in range(100):
# sio.emit('read_sn', {'data': '01', })
# time.sleep(1)
# sio.emit('reader_connect', {'sn': '07E2E3070A01411104', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('reader_connect', {'sn': '07E2E3070A01411104', 'repair_content': '01', 'repair_man': '01'})
# sio.emit('send_code', {
# "phone": "15705213512",
# })
sio
.
emit
(
'login'
,
{
#"Authorization": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MzE3MDAzNDkuMjcwMzgzOCwidXNlcl9pZCI6MTYyfQ.VaRMAD4dBgz6_ps5jnMLLWCq3zKBJ32klr3wH1vlWI8",
"code"
:
"2011"
,
"phone"
:
"15052053275"
,
"type"
:
1
})
sio
.
wait
()
sio
.
wait
()
# sio.disconnect()
# sio.
# sio.
poetry.lock
View file @
b7494619
...
@@ -161,6 +161,20 @@ optional = false
...
@@ -161,6 +161,20 @@ optional = false
python-versions = ">=3.6"
python-versions = ">=3.6"
[[package]]
[[package]]
name = "pyjwt"
version = "2.1.0"
description = "JSON Web Token implementation in Python"
category = "main"
optional = false
python-versions = ">=3.6"
[package.extras]
crypto = ["cryptography (>=3.3.1,<4.0.0)"]
dev = ["sphinx", "sphinx-rtd-theme", "zope.interface", "cryptography (>=3.3.1,<4.0.0)", "pytest (>=6.0.0,<7.0.0)", "coverage[toml] (==5.0.4)", "mypy", "pre-commit"]
docs = ["sphinx", "sphinx-rtd-theme", "zope.interface"]
tests = ["pytest (>=6.0.0,<7.0.0)", "coverage[toml] (==5.0.4)"]
[[package]]
name = "pyscard"
name = "pyscard"
version = "2.0.1"
version = "2.0.1"
description = "Smartcard module for Python."
description = "Smartcard module for Python."
...
@@ -286,7 +300,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes
...
@@ -286,7 +300,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes
[metadata]
[metadata]
lock-version = "1.1"
lock-version = "1.1"
python-versions = "^3.7"
python-versions = "^3.7"
content-hash = "
6d5d5c246f0ee58a6cd1a42d6e086c0704dda2fb60c07f2d85c7a4922a996d88
"
content-hash = "
c084ca5398387135d6ebe4d6313113bcf74feecf47bc3dee8b56bbaea86e1d0f
"
[metadata.files]
[metadata.files]
bidict = [
bidict = [
...
@@ -393,6 +407,10 @@ markupsafe = [
...
@@ -393,6 +407,10 @@ markupsafe = [
{file = "MarkupSafe-2.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:693ce3f9e70a6cf7d2fb9e6c9d8b204b6b39897a2c4a1aa65728d5ac97dcc1d8"},
{file = "MarkupSafe-2.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:693ce3f9e70a6cf7d2fb9e6c9d8b204b6b39897a2c4a1aa65728d5ac97dcc1d8"},
{file = "MarkupSafe-2.0.1.tar.gz", hash = "sha256:594c67807fb16238b30c44bdf74f36c02cdf22d1c8cda91ef8a0ed8dabf5620a"},
{file = "MarkupSafe-2.0.1.tar.gz", hash = "sha256:594c67807fb16238b30c44bdf74f36c02cdf22d1c8cda91ef8a0ed8dabf5620a"},
]
]
pyjwt = [
{file = "PyJWT-2.1.0-py3-none-any.whl", hash = "sha256:934d73fbba91b0483d3857d1aff50e96b2a892384ee2c17417ed3203f173fca1"},
{file = "PyJWT-2.1.0.tar.gz", hash = "sha256:fba44e7898bbca160a2b2b501f492824fc8382485d3a6f11ba5d0c1937ce6130"},
]
pyscard = [
pyscard = [
{file = "pyscard-2.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:428027bb17d7bcacc8b315596c45ebc19f40868bec6892a6d71041b9c343c56a"},
{file = "pyscard-2.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:428027bb17d7bcacc8b315596c45ebc19f40868bec6892a6d71041b9c343c56a"},
{file = "pyscard-2.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:817c71ad5caeb10b33f6ab254ce372e146318937b2f549579abb90a4197088d5"},
{file = "pyscard-2.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:817c71ad5caeb10b33f6ab254ce372e146318937b2f549579abb90a4197088d5"},
...
...
pyproject.toml
View file @
b7494619
...
@@ -13,6 +13,8 @@ python-dotenv = "^0.19.0"
...
@@ -13,6 +13,8 @@ python-dotenv = "^0.19.0"
Flask-SocketIO
=
"^5.1.1"
Flask-SocketIO
=
"^5.1.1"
pyscard
=
"^2.0.1"
pyscard
=
"^2.0.1"
flask-redis
=
"^0.4.0"
flask-redis
=
"^0.4.0"
PyJWT
=
"^2.1.0"
simple-websocket
=
"^0.3.0"
[tool.poetry.dev-dependencies]
[tool.poetry.dev-dependencies]
...
...
utils/jwt_util.py
View file @
b7494619
...
@@ -50,7 +50,8 @@ def verify_jwt(token, secret=None):
...
@@ -50,7 +50,8 @@ def verify_jwt(token, secret=None):
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
import
time
import
time
token
=
generate_jwt
({
"user_id"
:
163
},
time
.
time
()
+
600
,
'qwer1234'
)
token
=
generate_jwt
({
"user_id"
:
162
},
time
.
time
()
+
6000
,
'qwer1234'
)
print
(
token
)
# for i in range(10):
# for i in range(10):
# result = verify_jwt(token, 'secret')
# result = verify_jwt(token, 'secret')
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment