Commit f4e121b6 by Aeolus

Merge remote-tracking branch 'origin/yanglei'

parents 64ccae6b 384ed4cf
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
@author:Aeolus
@file: __init__.py
@function:
@modify:
"""
from flask import Flask
from flask_cors import CORS
from flask_log_request_id import RequestID
from dotenv import load_dotenv
from models.base_model import db
from utils.my_redis_cache import redis_client
from utils.mylogger import set_logger
def create_app(config_name):
from config.env_path_config import env_path
load_dotenv(dotenv_path=env_path, verbose=True, override=True)
set_logger()
app = Flask("management")
from config.app_config import config
app.config.from_object(config[config_name])
CORS(app)
db.init_app(app)
redis_client.init_app(app)
RequestID(app)
from utils.middlewares import jwt_authentication, log_enter_interface, log_out_interface, close_db_session, \
get_platform, all_options_pass
app.before_request(log_enter_interface)
app.before_request(all_options_pass)
app.before_request(get_platform)
app.before_request(jwt_authentication)
app.after_request(log_out_interface)
app.after_request(close_db_session)
# todo register blueprint
from myapps.management.api import register_management_blueprint
register_management_blueprint(app)
return app
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@version:
author:Aeolus
@file: __init__.py.py
"""
from flask import Flask
from myapps.management.api.rent_query import rent_query_route
from myapps.management.api.machine_management import machine_query_route
def register_management_blueprint(app: Flask):
prefix = "/management"
app.register_blueprint(rent_query_route, url_prefix=prefix + "/rent")
app.register_blueprint(machine_query_route, url_prefix=prefix + "/machine")
\ No newline at end of file
#!usr/bin/env python
# -*- coding:utf-8 _*-
import json
import logging
import re
import time
from flask import Blueprint, request, jsonify, g
from models.base_model import db
from utils.my_response import BaseResponse
from models.models import Machine, Hatch
from utils.error_code import MACHINE_NOT_EXIST_ERROR
logger = logging.getLogger(__name__)
# 创建蓝图
machine_query_route = Blueprint('machine', __name__)
# 展示机器信息
@machine_query_route.route("show",methods=["post"])
def machine_show():
json_date = request.get_json()
page = json_date["page"]
page_size = json_date["page_size"]
# 展示机器信息
m_count = Machine.query.count()
machine_result = Machine.query.offset((page-1)*page_size).limit(page_size).all()
if not machine_result:
return BaseResponse(data=[], page=page, page_size=page_size)
result_date = []
for result in machine_result:
res = {
"machine_no": result.machine_no,
"address": result.address,
"short_address": result.short_address,
"hatch_number": result.hatch_number,
"status": result.status
}
result_date.append(res)
return BaseResponse(data=result_date, page=page, page_size=page_size,page_cout=m_count//page_size+1)
# 模糊查询到机器信息
@machine_query_route.route("query_machine",methods=["post"])
def query_machine():
json_date = request.get_json()
number = json_date["number"]
page = json_date["page"]
page_size = json_date["page_size"]
pass
# 修改机器信息
@machine_query_route.route("query_edit",methods=["post"])
def query_edit():
json_date = request.get_json()
machine_no = json_date["machine_no"]
address = json_date["address"]
short_address = json_date["short_address"]
status = json_date["status"]
machine_not = Machine.query.filter_by(machine_no=machine_no).first()
if not machine_not:
return MACHINE_NOT_EXIST_ERROR
machine_result = Machine.query.filter_by(machine_no=machine_no).update(
{"address": address, "short_address": short_address, "status": status}
)
db.session.commit()
return BaseResponse()
# 显出机柜数量
@machine_query_route.route("hatch_query",methods=["post"])
def hatch_query():
json_date = request.get_json()
machine_no = json_date["machine_no"]
machine_not = Machine.query.filter_by(machine_no=machine_no).first()
if not machine_not:
return MACHINE_NOT_EXIST_ERROR
machine_result = Hatch.query.filter_by(machine_no=machine_no).all()
result_date = []
for res in machine_result:
result_date.append(res.hatch_no)
return BaseResponse(data=result_date)
# 查询信息
@machine_query_route.route("query", methods=["post"])
def query():
json_date = request.get_json()
number = str(json_date["number"])
page = json_date["page"]
page_size = json_date["page_size"]
machine_not = Machine.query.filter(Machine.machine_no.like("%"+number+"%")).first()
if not machine_not:
add_not = Machine.query.filter(Machine.address.like("%"+number+"%")).first()
if not add_not:
return BaseResponse(data=[])
else:
add_result = Machine.query.filter(Machine.address.like("%"+number+"%")).offset((page-1)*page_size).limit(
page_size
).all()
result_date = []
for add in add_result:
date = {
"machine_no": add.machine_no,
"address": add.address,
"short_address": add.short_address,
"hatch_number": add.hatch_number,
"status": add.status
}
result_date.append(date)
return BaseResponse(data=result_date, page=page, page_size=page_size)
else:
machine_result = Machine.query.filter(Machine.machine_no.like("%"+number+"%")).offset((page-1)*page_size).\
limit(page_size).all()
result_date = []
for machine in machine_result:
date = {
"machine_no": machine.machine_no,
"address": machine.address,
"short_address": machine.short_address,
"hatch_number": machine.hatch_number,
"status": machine.status
}
result_date.append(date)
return BaseResponse(data=result_date, page=page, page_size=page_size)
#!usr/bin/env python
# -*- coding:utf-8 _*-
import json
import logging
import re
import time
from config.wechat_config import platform_appid_config_list, pay_config_list, NFC_PAY_CALLBCK_URL
from flask import Blueprint, request, jsonify, g
from models.base_model import db
from utils.my_response import BaseResponse
from utils.error_code import NO_RENT_RECORD
from models.models import Rent
from service.rent_service import RentService
from service.wechat_service import WeChatPayService
logger = logging.getLogger(__name__)
rent_query_route = Blueprint('rent', __name__) # 创建蓝图
# 查询订单
@rent_query_route.route("part",methods=["post"])
def rent_query():
json_date = request.get_json()
get_number =str(json_date["number"])
# 订单状态
is_pay = json_date["is_pay"]
add_time = json_date["add_time"]
pay_time = json_date["pay_time"]
page = json_date["page"]
page_size = json_date["page_size"]
if is_pay == 1 or is_pay == 0: # 完成或未完成订单
if add_time == "" or pay_time == "": # 没有时间筛选
machine_result = Rent.query.filter(Rent.machine_no.like("%"+get_number+"%"),is_pay=is_pay).offset((page-1)*page_size)\
.limit(page_size).all()
else: # 开始进行模糊查询以及时间筛选
query1 = "SELECT * from rent where concat(rent.machine_no,rent.user_id,rent.rent_no) like '{}' and add_time > '{}' AND pay_time < '{}' and is_pay={} limit {},{}".format(
"%"+get_number+"%", add_time, pay_time, is_pay, (page-1)*page_size, page*page_size
)
machine_result = db.session.execute(query1).fetchall()
else: # 已支付或没有支付
if add_time == "" or pay_time == "": # 没有时间筛选
machine_result = Rent.query.filter(Rent.machine_no.like("%"+get_number+"%")).offset((page-1)*page_size)\
.limit(page_size).all()
else: # 开始进行模糊查询以及时间筛选
query1 = "SELECT * from rent where concat(rent.machine_no,rent.user_id,rent.rent_no) like '{}' and add_time > '{}' AND pay_time < '{}' limit {},{}".format(
"%" + get_number + "%", add_time, pay_time,(page-1)*page_size, page*page_size
)
machine_result = db.session.execute(query1).fetchall()
result_date = []
# 将数据遍历和筛选
for i in machine_result:
date = {
"rent_no ": i.rent_no,
"machine_no": i.machine_no,
"user_id": i.user_id,
"place_id": i.place_id,
"agent_total": i.agent_total,
"is_pay": i.is_pay,
"add_time": i.add_time.strftime("%Y-%m-%d %H:%M:%S"),
"pay_time": i.pay_time.strftime("%Y-%m-%d %H:%M:%S")
}
result_date.append(date)
return BaseResponse(date=result_date, page=page, page_size=page_size)
# 点击详情按钮,显示订单信息
@rent_query_route.route("datails",methods=["post"])
def details():
# 根据订单号来获取信息
json_date = request.get_json()
rent_no = json_date["rent_no"]
rent_not = Rent.query.filter(rent_no=rent_no).first()
if not rent_not:
return BaseResponse(data=[])
rent_result = Rent.query.filter(rent_no=rent_no).all()
result_date = []
for result in rent_result:
date = {
"rent_no": result.rent_no,
"machine_no": result.machine_no,
"user_id": result.user_id,
"place_id": result.place_id,
"back_money": result.back_money,
"is_pay": result.is_pay,
"rent_pay": result.rent_pay,
"add_time": result.add_time,
"pay_time": result.pay_time,
"over_time": result.over_time
}
result_date.append(date)
return BaseResponse(date=result_date)
# 进行订单修改并进行退款
@rent_query_route.route("edit_s",methods=["post"])
def edit_s():
json_date = request.get_json()
rent_no = json_date["rent_no"]
is_pay = json_date["is_pay"]
rent = Rent.query.filter_by(rent_no=rent_no, is_pay=1)
if not rent:
return NO_RENT_RECORD
# 退款操作
data = {
"out_refund_no": RentService.create_refund_no(),
"out_trade_no": rent.rent_no,
"total_fee": rent.pay_money,
"refund_fee": rent.pay_money
}
result = WeChatPayService(app_id=platform_appid_config_list[g.user.platform],
config_name=pay_config_list[rent.mch_platform]).do_refund(data)
if result:
rent_refund = Rent()
rent_refund.refund_no = data["out_refund_no"]
rent_refund.rent_no = rent_no
rent_refund.fee = data["refund_fee"]
rent.status = 3
db.session.add(rent_refund)
db.session.commit()
return BaseResponse()
#!usr/bin/.env python
# -*- coding:utf-8 _*-
"""
@version:
@author:Aeolus
"""
import os
import logging
from myapps.management import create_app
logger = logging.getLogger(__name__)
app = create_app(os.getenv('RUN_ENV', 'prod'))
logger.info("run server")
if __name__ == '__main__':
app.run('127.0.0.1', 8893, debug=False)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment