Commit e6908117 by yanglei

login

parent 8d365ac4
......@@ -355,3 +355,32 @@ class WxUser(Base):
comment='上次登录时间')
created_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
class Management(Base):
__tablename__ = 'management_login'
id = Column(INTEGER(10), primary_key=True, unique=True)
user_no = Column(String(25, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
user_name = Column(String(255, 'utf8mb4_unicode_ci'), nullable=False)
phone = Column(String(255, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
key = Column(String(255, 'utf8mb4_unicode_ci'), nullable=False, unique=True)
level = Column(INTEGER(1), nullable=False, comment='1:补货员')
status = Column(INTEGER(1), nullable=False, comment='1:正常 2:删除')
_password_hash_ = Column(String(255, 'utf8mb4_unicode_ci'))
last_login = Column(DateTime)
expire_time = Column(DateTime)
created_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
@property
def password(self):
raise Exception('密码不能被读取') # 为了保持使用习惯,还是设置一个password字段用来设置密码,当然也不能被读取。
# 赋值password,则自动加密存储。
@password.setter
def password(self, value):
self._password_hash_ = generate_password_hash(value)
# 使用check_password,进行密码校验,返回True False。
def check_password(self, pasword):
return check_password_hash(self._password_hash_, pasword)
......@@ -9,6 +9,7 @@ from flask import Flask
from myapps.management.api.rent_query import rent_query_route
from myapps.management.api.machine_management import machine_query_route
from myapps.management.api.login import login_route
......@@ -16,3 +17,4 @@ def register_management_blueprint(app: Flask):
prefix = "/management"
app.register_blueprint(rent_query_route, url_prefix=prefix + "/rent")
app.register_blueprint(machine_query_route, url_prefix=prefix + "/machine")
app.register_blueprint(login_route, url_prefix=prefix + "/login")
#!usr/bin/env python
# -*- coding:utf-8 _*-
import json
import logging
import re
import time
from flask import Blueprint, request, jsonify, g
from models.base_model import db
from utils.my_response import BaseResponse
from models.models import Management
from utils.error_code import PASSWORD_ERROR
from service.make_token import generate_token,certify_token
logger = logging.getLogger(__name__)
# 创建蓝图
login_route = Blueprint('login', __name__)
#登录
@login_route.route("login_phone",methods=["post"])
def login():
json_date = request.get_json()
number = json_date["number"]
password = json_date['password']
key = json_date["key"]
#手机号登录
phone_result = Management.query.filter_by(phone=number, status=1).first()
if phone_result:
# 进行密码验证
if phone_result.check_password(password) == True:
token_making = generate_token(key, 360)
# 从获取库里获取key
ky = phone_result.key
return BaseResponse(data=certify_token(ky, token_making))
else:
return BaseResponse(**PASSWORD_ERROR)
#用户id登录
user_result = Management.query.filter_by(user_no=number, status=1).first()
if user_result:
# 进行密码验证
if user_result.check_password(password) == True:
token_making = generate_token(key, 360)
# 从获取库里获取key
ky = user_result.key
return BaseResponse(data=certify_token(ky, token_making))
else:
return BaseResponse(**PASSWORD_ERROR)
return BaseResponse(date="无法登录,用户id或手机号错误")
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import datetime
import logging
import time
......
# coding: utf-8
import time
import base64
import hmac
def generate_token(key, expire=3600):
r'''
@Args:
key: str (用户给定的key,需要用户保存以便之后验证token,每次产生token时的key 都可以是同一个key)
expire: int(最大有效时间,单位为s)
@Return:
state: str
'''
ts_str = str(time.time() + expire)
ts_byte = ts_str.encode("utf-8")
sha1_tshexstr = hmac.new(key.encode("utf-8"),ts_byte,'sha1').hexdigest()
token = ts_str+':'+sha1_tshexstr
b64_token = base64.urlsafe_b64encode(token.encode("utf-8"))
return b64_token.decode("utf-8")
def certify_token(key, token):
r'''
@Args:
key: str
token: str
@Returns:
boolean
'''
token_str = base64.urlsafe_b64decode(token).decode('utf-8')
token_list = token_str.split(':')
if len(token_list) != 2:
return False
ts_str = token_list[0]
if float(ts_str) < time.time():
# token expired
return False
known_sha1_tsstr = token_list[1]
sha1 = hmac.new(key.encode("utf-8"),ts_str.encode('utf-8'),'sha1')
calc_sha1_tsstr = sha1.hexdigest()
if calc_sha1_tsstr != known_sha1_tsstr:
# token certification failed
return False
# token certification success
return True
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment