Files
MainSite/app/source/API_Common.py
2024-02-29 01:04:07 +04:00

1177 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API с общими функциями и процедурами
Дата последней оптимизации: 22.05.2022
Дата обновления: 26.01.2024
"""
import subprocess
import uuid # Генератор GUID
import datetime # Работа с датой
import time # работа с временем
import os # Работа с файловой системой
import requests # Работа с http/s
# Для Криптографии
import hmac
import base64
import hashlib
import socket # Для проверки доступности портов
from app import app # чтение из config.py
from flask import request # Для получения метаданных о запросе
import sys # Подключаем свои библиотеки
sys.path.append("app/source")
import API_App
appname = app.config['SHMNAME']
NameModule = "API_Common"
################################################################################
class DataSession:
"""Класс для передачи данных о сесссии"""
def __init__(self):
self.status: bool = False # Действует сессия или нет
self.comment: str = ""
self.sessionid: str = ""
self.userid: str = ""
self.username: str = ""
self.stationid: str = ""
################################################################################
def HTML_Progress(Color, IsAnimated: bool, PCur: int, PMax: int, MainDiv: bool = True) -> str:
"""
Создание строки прогресса для HTML
:param Color: error, warning, blue, info, success
:param IsAnimated: True или False
:param PCur: Любое целое число
:param PMax: Любое целое число
:return: str
"""
ProgressColor = ""
if (Color.lower() == "red" or Color.lower() == "danger" or Color.lower() == "error"):
ProgressColor = "bg-danger"
if (Color.lower() == "yellow" or Color.lower() == "warning"):
ProgressColor = "bg-warning"
if (Color.lower() == "green" or Color.lower() == "success"):
ProgressColor = "bg-success"
if (Color.lower() == "info"):
ProgressColor = "bg-info"
if (IsAnimated == True):
ProgressAnim = "progress-bar-animated"
else:
ProgressAnim = ""
if (PMax > 0):
PWidth = int(round(int(PCur) / int(PMax) * 100))
else:
PWidth = 100
PCur = 100
HTML = """<div class="progress-bar progress-bar """
if (MainDiv == True):
HTML = """<div class="progress"><div class="progress-bar progress-bar-striped """
HTML += f"""{ProgressAnim} {ProgressColor}" role="progressbar"
style="width:{PWidth}%" aria-valuenow="{PCur}"
aria-valuemin="0" aria-valuemax="{PMax}"></div>"""
if (MainDiv == True):
HTML += """</div>"""
return HTML
################################################################################
def Now(Format: int = 0):
"""
Текущая дата и время:
* 0 - в секундах (timestamp, unixtime, отчет с 01.01.1970): int
* 1 - ГГГГММДД: str
* 2 - ГГГГ-ММ-ДД: str
* 10 - в миллисекундах (без точки, для уникальности файла или временной таблицы) (timestamp, unixtime, отчет с 01.01.1970): str
* 102 - ГГГГ-ММ-ДД ЧЧ:ММ:СС: str
"""
if (Format == 0):
return int(str(time.mktime(datetime.datetime.now().timetuple())).split('.')[0])
if (Format == 10):
return str(str(time.mktime(datetime.datetime.now().timetuple())).replace(".", ""))
if (Format == 1):
return str(datetime.datetime.now().strftime("%Y%m%d"))
if (Format == 2):
return str(datetime.datetime.now().strftime("%Y-%m-%d"))
if (Format == 102):
return str(datetime.datetime.now()).split('.')[0]
return ""
################################################################################
def SaveLog(Filename, Value, ShowTimeInLog=True):
"""Запись лога в конец"""
CurTime = datetime.datetime.now()
Today = CurTime.strftime("%Y-%m-%d")
# Проверяем и создаем путь для логов
if (os.path.exists("logs") == False):
os.mkdir("logs")
if (os.path.exists("logs") == False):
SaveLog(NameModule, "Не удается создать папку logs!")
return
file = open("logs/" + Filename + "_" + str(Today) + ".log", "a")
if (ShowTimeInLog == True):
file.write(str(CurTime) + ": " + Value + "\r\n")
else:
file.write(Value + "\r\n")
file.close()
if (app.config['SHOWLOG'] == True or GetVariable('Debug') == "1"):
print(str(CurTime) + ": " + Value)
###############################################################################
def validate(date_text: str):
"""
Проверка что значение является датой.
На выходе datetime.datetime
:param date_text: Проверяемая дата
:return datetime.datetime
"""
try:
# Проверка что есть доли секунды
if (date_text.find(".") < 0):
# Нет
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S').strftime("%d.%m.%Y %H:%M:%S")
else:
# Есть
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f').strftime("%d.%m.%Y %H:%M:%S")
except Exception: # as E:
if (date_text == "None"):
return ""
else:
return date_text #
###############################################################################
def SQLEscape(Query: str) -> str:
"""
Экранирование одинарных кавычек для исключения SQL-инъеккции в переменных
:param Query: часть запроса для экранирования
"""
return Query.replace("'", "''")
###############################################################################
def ThreadVars_Message() -> str:
"""Полоска для сообщений на всех страницах"""
return """<div v-if="ThreadVars.message!=''" class="alert" v-bind:class="{'alert-primary':M_Blue,'alert-danger':M_Red, 'alert-success':M_Green, 'alert-warning':M_Yellow}" role="alert" >{{ThreadVars.message}}</div>"""
###############################################################################
def UserHead(Vars):
"""
Чтение заголовка из файла block_head для шаблона
:param Vars
"""
CurStr = ""
if (os.path.exists("app/templates/block_head.htm") == True):
file = open("app/templates/block_head.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
return CurStr
###############################################################################
def UserHeader(SessionID: str, Vars: dict) -> str:
"""
Чтение заголовка из файла block_header для шаблона
:param SessionID: ID-сессии
:param Vars: Массив переменных
"""
CurDataSession: DataSession = CheckSession(SessionID, False)
CurStr = ""
if (os.path.exists("app/templates/block_header.htm") == True):
file = open("app/templates/block_header.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0])
if (request.args.get("Name") is None):
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3])
else:
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/"))
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
CurStr = CurStr.replace("{{FIO}}", CurDataSession.username)
Vars.update({"CurStr": CurStr})
Vars.update({"request": request})
return API_App.UserHeader(SessionID, Vars)
###############################################################################
def UserErrorHeader(SessionID: str, Vars: dict) -> str:
"""
Чтение заголовка из файла block_error_header для шаблона
:param SessionID: ID-сессии
:param Vars: Массив переменных
"""
CurDataSession: DataSession = CheckSession(SessionID, False)
CurStr = ""
if (os.path.exists("app/templates/block_error_header.htm") == True):
file = open("app/templates/block_error_header.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0])
if (request.args.get("Name") is None):
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3])
else:
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/"))
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
CurStr = CurStr.replace("{{FIO}}", CurDataSession.username)
return CurStr
###############################################################################
def Modals(Vars) -> str:
"""Чтение из файла block_modals для шаблона
:param Vars: Массив переменных. Пока не используется.
"""
CurStr = ""
if (os.path.exists("app/templates/block_modals.htm") == True):
file = open("app/templates/block_modals.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
return CurStr
###############################################################################
def UpdateSession(SessionID: str):
"""
Процедура продляет существование сессии
"""
if (app.config['SHMNAME'] == 'kmsik'):
return API_App.UpdateSession(SessionID) # Нестандартное обновление сессии
# Если сессия не существует или уже просрочена, то ничего не делаем
CurDataSession: DataSession = CheckSession(SessionID, False)
if (CurDataSession.status == False):
return
if (CurDataSession.sessionid != SessionID):
return
# Продляем проверочную сессию
DBEG = time.mktime(datetime.datetime.now().timetuple())
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
# Продляем сессию еще на 15 минут
SaveDataSession(SessionID, "endsession", str(DEND))
###############################################################################
def ReadDataSession(SessionID, Variable, IsByte=False):
"""
Считываем значение переменной сессии
:param SessionID: Сессия
:param Variable: Параметр
:param IsByte: Результат будет двоичные данные или нет
:return При нестандартной ситуации возвращает пустоту
"""
TempVar = ""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "")
return ""
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "/sessions")
return ""
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
if (os.path.exists(CurPath + "/" + Variable) == True):
if (IsByte == True):
file = open(CurPath + "/" + Variable, "rb")
else:
file = open(CurPath + "/" + Variable, "r")
TempVar = file.read()
file.close()
return TempVar
else:
SaveLog(NameModule, "ReadDataSession: Переменная не существует: " + CurPath + "/" + Variable)
else:
SaveLog(NameModule, "ReadDataSession: Путь не существует: " + CurPath)
return ""
###############################################################################
def SaveDataSession(SessionID, Variable, Value, IsByte=False) -> bool:
"""
Сохраняем переменную сессии
:param SessionID: Сессия
:param Variable: Параметр
:param Value: Сохраняемое значение
:param IsByte: Сохранить бинарно или нет
:return При нестандартной ситуации возвращает False
"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
return False
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
return False
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
if (IsByte == False):
file = open(CurPath + "/" + Variable, "w")
else:
file = open(CurPath + "/" + Variable, "wb")
file.write(Value)
file.close()
return True
return False
###############################################################################
def CheckSession(SessionID, FlagNeedUpdate) -> DataSession:
"""
Процедура проверяет существование сессии
:param SessionID: Сессия
:param FlagNeedUpdate: Требуется ли обновление сессии при ее проверке
:return При существовании сессии .status = True
"""
CurDataSession = DataSession()
# На пустую сессию ничего не возвращать
if (str(SessionID) == "" or SessionID is None):
CurDataSession.comment = "Не передан SessionID"
return CurDataSession
# Текущее время в секундах
DBEG = time.mktime(datetime.datetime.now().timetuple())
CurTime = datetime.datetime.now()
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
CurDataSession.comment = "Не удается создать каталог для сессии"
return CurDataSession
# Проверяем наличие сессии
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
EndSession = ReadDataSession(SessionID, "endsession")
# Если нет информации об окончании сессии пробуем еще раз
if (EndSession == ""):
time.sleep(1)
EndSession = ReadDataSession(SessionID, "endsession")
# Если так нет информации об окончании сессии то пишем ошибку
if (EndSession == ""):
SaveLog(NameModule, "CheckSession: Нет информации об окончании сессии: " + SessionID + ".")
CurDataSession.status = False
CurDataSession.comment = "Нет информации об окончании сессии"
return CurDataSession
# Если сессия уже истекла, то удаляем её
if (float(DBEG) > float(EndSession)):
SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.")
DeleteSession(SessionID)
CurDataSession.comment = "Сессия истекла"
return CurDataSession
# Если всё ОК то заполняем данные
CurDataSession.userid = str(ReadDataSession(SessionID, "userid"))
CurDataSession.username = str(ReadDataSession(SessionID, "username"))
CurDataSession.stationid = str(ReadDataSession(SessionID, "stationid"))
CurDataSession.sessionid = SessionID
# Продлеваем если необходимо
if (FlagNeedUpdate == True):
UpdateSession(SessionID)
CurDataSession.status = True
CurDataSession.comment = "Сессия существует"
return CurDataSession # возвращаем данные сессии
CurDataSession.comment = "Сессия не существует"
return CurDataSession # возвращаем данные сессии
###############################################################################
def CreateSession(UserID: str, UserName: str, StationID: str = "", SessionID: str = "", IPClient: str = ""):
"""Процедура создает сессию, после успешного определения авторизации
:param UserID
:param UserName
:param StationID: Не обязательный параметр
:param SessionID: Не обязательный параметр
:param IPClient: Не обязательный параметр
"""
if (app.config['SHMNAME'] == 'kmsik'):
return API_App.CreateSession(UserID, UserName, StationID, SessionID, IPClient) # Для КМС-ИК
# Удаление старых сессии
DeleteExpiredSession()
# защита от создания сессии на пустой ID
if (UserID == "" or UserID == "0"):
return ""
# Переменные
DBEG = time.mktime(datetime.datetime.now().timetuple())
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
CurTime = datetime.datetime.now()
if (SessionID == ""):
SessionID = "S_" + str(uuid.uuid4())
SaveLog(NameModule, str(CurTime) + ": Создание сессии " + SessionID)
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# создаем файл для сессии
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
os.mkdir(CurPath)
# Сохраняем данные
SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID)
SaveDataSession(SessionID, "ipclient", str(IPClient))
SaveDataSession(SessionID, "userid", str(UserID))
SaveDataSession(SessionID, "username", str(UserName))
SaveDataSession(SessionID, "startsession", str(DEND))
SaveDataSession(SessionID, "endsession", str(DEND))
SaveDataSession(SessionID, "stationid", str(StationID))
SaveLog(NameModule, "Успешный вход в систему: " + UserName)
return SessionID # возвращаем ID сессии
###############################################################################
def CreateSessionVK(UserID, UserName):
"""
Процедура создает сессию, после успешного определения авторизации
:param UserID:
:param UserName:
"""
# Удаление старых сессии
DeleteExpiredSession()
# защита от создания сессии на пустой ID
if (UserID == "" or UserID == "0"):
return ""
# Переменные
DBEG = time.mktime(datetime.datetime.now().timetuple())
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
SessionID = str(uuid.uuid4())
CurTime = datetime.datetime.now()
SaveLog(NameModule, str(CurTime) + ": Создание сессии ВК " + SessionID)
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# создаем файл для сессии
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
os.mkdir(CurPath)
# Сохраняем данные
SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID)
SaveDataSession(SessionID, "userid", str(UserID))
SaveDataSession(SessionID, "username", str(UserName))
SaveDataSession(SessionID, "startsession", str(DEND))
SaveDataSession(SessionID, "endsession", str(DEND))
SaveDataSession(SessionID, "fromvk", '1')
SaveLog(NameModule, "Успешный вход в систему из под ВК: " + UserName)
return SessionID # возвращаем ID сессии
###############################################################################
def DeleteExpiredSession():
"""Процедура удаляет из базы истекшие сессии"""
# Текущее время в секундах
DBEG = time.mktime(datetime.datetime.now().timetuple())
CurTime = datetime.datetime.now()
SaveLog(NameModule, str(CurTime) + ": Запуск удаления старых сессии ")
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# Получаем каталог сессии
for SessionID in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions"):
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
# проверяем не файл ли это
if (os.path.isfile(CurPath) == False):
# проверяем срок сессии
EndSession = ReadDataSession(SessionID, "endsession")
if (EndSession == ""):
DeleteSession(SessionID)
else:
# Если сессия уже истекла, то удаляем её
if (float(DBEG) > float(EndSession)):
SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.")
DeleteSession(SessionID)
###############################################################################
def DeleteSession(SessionID):
"""Процедура удаляет из базы конкретную сессию"""
CurTime = datetime.datetime.now()
if (SessionID == ""):
SaveLog(NameModule, str(CurTime) + ": Сессия для удаления не указана!")
return False
SaveLog(NameModule, str(CurTime) + ": Удаление сессии " + SessionID)
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# создаем файл для сессии
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID) == True):
# Удаляем все файлы сессии
for f in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID):
os.remove(f"/dev/shm/{app.config['SHMNAME']}/sessions/{SessionID}/{f}")
# Удаляем папку сессии
os.rmdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID)
###############################################################################
def GetVariable(Variable: str) -> str:
"""
Считываем настройку программы
:param Variable: имя переменной
"""
TempVar = ""
# Проверяем наличие сессии
CurPath = "db/Settings/"
# Если работает с Виртуальной памятью, то ищем настройки там
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/Settings") == True):
CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/"
if (os.path.exists(CurPath) == True):
if (os.path.exists(CurPath + "/" + Variable) == True):
file = open(CurPath + "/" + Variable, "r")
TempVar = file.read()
file.close()
return TempVar.strip()
return ""
###############################################################################
def SaveVariable(Variable, Value):
"""
Сохраняем переменную от настроек программы
:param Variable: Параметр
:param Value: Сохраняемое значение
"""
# Проверяем пусть
if (os.path.exists("db") == False):
return ""
if (os.path.exists("db/Settings") == False):
return ""
# Сохраняем настройки в программе
CurPath = "db/Settings/"
if (os.path.exists(CurPath) == True):
file = open(CurPath + "/" + Variable, "w")
file.write(str(Value))
file.close()
# Сохраняем настройки в памяти
CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/"
if (os.path.exists(CurPath) == True):
file = open(CurPath + "/" + Variable, "w")
file.write(str(Value))
file.close()
return True
return False
###############################################################################
def GetSHA512_FromFile(Filename: str, secret: str) -> str:
"""
Получить хэш на основе секретного ключа
:param Filename: путь к файлу
:param secret:
"""
if (os.path.exists(Filename) == False):
return ""
# Расчет хэша
file = open(Filename, "rb")
CurStr = file.read()
file.close()
return GetSHA512_File(CurStr, secret)
###############################################################################
def GetSHA512_File(message: bytes, secret: str) -> str:
"""
Получить хэш из массива байтов на основе секретного ключа
:param message: bytes
:param secret
"""
h = hmac.new(bytearray(secret, "UTF-8"), message, hashlib.sha512)
return base64.b64encode(h.digest()).decode()
###############################################################################
def GetSHA512_Text(message: str, secret: str) -> str:
"""Получить хэш из текста на основе секретного ключа"""
h = hmac.new(bytearray(secret, "UTF-8"), bytearray(message, "UTF-8"), hashlib.sha512)
return base64.b64encode(h.digest()).decode()
###############################################################################
def GetSHA1_Text(message: str) -> str:
"""Получить хэш из текста на основе секретного ключа. Совместим с хэшем SQL 2005"""
hash_object = hashlib.sha1(bytearray(message, "UTF-8"))
hex_dig = hash_object.hexdigest()
return hex_dig
###############################################################################
def CheckPort(ip, port) -> str:
"""
Проверка порта на его открытие
"OPEN" или "CLOSE"
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
ip = ip.split("\\")[0]
Attempt = 0
LastStatus = ""
SaveLog(NameModule, "Проверка порта на: " + ip + ":" + str(port))
# Проверяем трижды или до 1 успеха
while (Attempt < 3):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
try:
sock.connect((ip, int(port)))
SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' открыт.')
# conn.close()
return "OPEN"
except Exception as E:
SaveLog(NameModule, "CheckPort: " + str(E))
# LastStatus = "ERROR"
time.sleep(1)
LastStatus = "CLOSE"
Attempt = Attempt + 1
SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' закрыт.')
return LastStatus
###############################################################################
def RenameDataSession(SessionID, OldNameVar, NewNameVar) -> bool:
"""
Переименовываем переменную сессии
"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
return False
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
return False
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
os.rename(CurPath + "/" + OldNameVar, CurPath + "/" + NewNameVar)
return True
return False
###############################################################################
def RemoveFromFolder(folder, RemoveSelf=False) -> str:
"""
Удалить всё из указанной папки
* RemoveSelf - Удалить саму папку (уже пустую)
return: Текст ошибки
"""
print('Обработка папки на удаление : ' + folder)
for filename in os.listdir(folder): # Получаем список файлов в папке
file_path = os.path.join(folder, filename) # Полный путь к файлу или папке
try:
if os.path.isfile(file_path): # Если это файл
print('Удаляем файл: ' + file_path)
os.remove(file_path) # Удалить
elif os.path.isdir(file_path): # Если это папка
RemoveFromFolder(file_path) # Удаляем содержимое этой папки (рекурсия)
print('Удаляем папку : ' + file_path)
os.rmdir(file_path) # А затем удалить саму эту папку
except Exception as e:
TextLog = f'Не удалось удалить {file_path}. Причина: {e}'
SaveLog(NameModule, TextLog)
return TextLog
if (RemoveSelf == True):
print('Удаляем папку : ' + folder)
os.rmdir(folder) # А затем удалить саму эту папку
print('Обработка папки на удаление завершена: ' + folder)
return ""
###############################################################################
def RemoveDataSession(SessionID, Variable):
"""
Удаляем переменную сессии
"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
return False
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
return False
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
if (os.path.isfile(CurPath + "/" + Variable)):
os.remove(CurPath + "/" + Variable)
return True
return False
########################################################################
def GetSHA1_FromFile(Filename):
"""Хэш для совместимости с MSSQL"""
if (os.path.exists(Filename) == False):
return ""
# Расчет хэша
file = open(Filename, "rb")
CurStr = file.read()
file.close()
hash_object = hashlib.sha1(CurStr)
hex_dig = hash_object.hexdigest()
return hex_dig
########################################################################
def ConvertStrToDate(date_text: str) -> datetime.datetime:
"""
Проверка что строка является датой.
* Поддерживается ГГГГММДД, ГГГГ-ММ-ДД, ДД.ММ.ГГГГ.
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ:СС
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ:СС.мс
* Поддерживается с буквой T вместо пробела.
"""
try:
date_text = date_text.strip()
# Если слишком мало текста для преобразования
# Дата не указана полностью
if (len(date_text) < 8 or len(date_text) == 9):
return None # type: ignore
# нет секунд. Вставляем их.
# print("date_text: " + date_text)
# print(f"len date_text: {len(date_text)}")
# print(f"""date_text.find("T": {date_text.find("T")}""")
if (len(date_text) == 16 and (date_text.find("T") == 10)):
date_text = date_text + ":00"
print("date_text: " + date_text)
# время не указано полностью
if (len(date_text) > 10 and len(date_text) < 19):
return None # type: ignore
# не 1-х и не 2-х тысячалетие
if (len(date_text) == 8 and date_text[:1] not in ('1', '2')):
return None # type: ignore
# ГГГГММДД
if (len(date_text) == 8):
return datetime.datetime.strptime(date_text, '%Y%m%d')
# Если точка идет 3-им символов (для формата ДД.ММ.ГГГГ )
if (date_text.find('.') == 2 or date_text.find('/') == 2 or date_text.find('-') == 2):
date_text = f"{date_text[6:10]}-{date_text[3:5]}-{date_text[0:2]}"
if (len(date_text) == 10):
return datetime.datetime.strptime(date_text.replace(".", "-"), '%Y-%m-%d')
# Если время разделено буквой T
if (date_text.find("T") > 0):
if (date_text.find(".") < 0): # Проверка что есть доли секунды
# Нет
return datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S')
else:
# Есть
return datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S.%f')
else:
# Проверка что есть доли секунды
if (date_text.find(".") < 0):
# Нет
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S')
else:
# Есть
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f')
except Exception:
if (date_text == "None"):
return None # type: ignore
else:
return date_text # type: ignore
###############################################################################
def ConvertDate(d: datetime.datetime, NewType=102) -> str:
"""
Преобразование даты в указанный формат:
* 102 - ГГГГ-ММ-ДД
* 104 - ДД.ММ.ГГГГ
* 107 - январь 2023
* 108 - Январь 2023
* 109 - ЯНВАРЬ 2023
* 112 - ГГГГММДД
* 122 - ГГГГ-ММ-ДД ЧЧ:ММ:СС
"""
M = {}
M.update({"01": "январь"})
M.update({"02": "февраль"})
M.update({"03": "март"})
M.update({"04": "апрель"})
M.update({"05": "май"})
M.update({"06": "июнь"})
M.update({"07": "июль"})
M.update({"08": "август"})
M.update({"09": "сентябрь"})
M.update({"10": "октябрь"})
M.update({"11": "ноябрь"})
M.update({"12": "декабрь"})
if NewType == 102:
return d.strftime("%Y-%m-%d")
if NewType == 104:
return d.strftime("%d.%m.%Y")
if NewType == 107:
return M[d.strftime("%m")] + " " + d.strftime("%Y")
if NewType == 108:
return M[d.strftime("%m")].capitalize() + " " + d.strftime("%Y")
if NewType == 109:
return M[d.strftime("%m")].upper() + " " + d.strftime("%Y")
if NewType == 112:
return d.strftime("%Y%m%d")
if NewType == 122:
return d.strftime('%Y-%m-%d %H:%M:%S')
return ""
###############################################################################
def GetTZ():
"""Получить текущий часовой пояс"""
now = datetime.datetime.now()
Z = str(now.replace(tzinfo=datetime.timezone.utc) - now.astimezone(datetime.timezone.utc)).split(':')[0]
return int(Z)
###############################################################################
def GetStatusService(ServiceName) -> str:
# Проверка статуса
Result = subprocess.run("systemctl -all | grep " + ServiceName, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if (str(Result.returncode) == "1"):
return "None"
# Проверка статуса
p = subprocess.Popen(["systemctl", "is-active", ServiceName], stdout=subprocess.PIPE)
(output, err) = p.communicate()
output = output.decode('utf-8')
# print(output)
return output.strip()
###############################################################################
def CreateDBSettings():
"""Создание базы и таблицы настроек"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("db") == False):
os.mkdir("db")
if (os.path.exists("db/Settings") == False):
os.mkdir("db/Settings")
###############################################################################
def GetFromRequestsT(Url: str, Attempt=1, headers={}) -> str:
"""Получить ответ по HTTP/S ссылке"""
if (headers == {}):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем GET-запрос на URL
response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert)
# Получаем статус-код ответа
status_code = response.status_code
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return GetFromRequestsT(Url, Attempt + 1)
else:
return ""
# Возвращаем тело ответа в виде строки
return response.text
###############################################################################
def GetFromRequestsB(Url: str, Attempt=1, headers={}) -> bytes:
"""
Получить ответ по HTTP/S ссылке.
Ответ в байтах. Например, скачаный двоичный файл.
"""
if (headers == {}):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем GET-запрос на URL
response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert)
# Получаем статус-код ответа
status_code = response.status_code
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return GetFromRequestsB(Url, Attempt + 1)
else:
return None # type: ignore
# Или в виде байтов
return response.content
###############################################################################
def PostFromRequestsB(Url: str, Data, Attempt=1, headers=None) -> bytes:
"""
Получить ответ по HTTP/S ссылке.
Ответ в байтах. Например, скачаный двоичный файл.
"""
if (headers is None):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем POST-запрос на URL
response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=Data)
# Получаем статус-код ответа
status_code = response.status_code
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return PostFromRequestsB(Url, Data, Attempt + 1)
else:
return None
# Или в виде байтов
return response.content
###############################################################################
def PostFromRequestsT(Url: str, data, Attempt=1, headers=None) -> dict:
"""
Получить ответ по HTTP/S ссылке.
Ответ в виде текста.
"""
Answer = {}
if (headers is None):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем POST-запрос на URL
response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=data)
# Получаем статус-код ответа
status_code = response.status_code
Answer.update({"code": str(status_code)})
Answer.update({"text": response.text})
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return PostFromRequestsT(Url, data, Attempt + 1)
else:
Answer.update({"status": "error"})
return Answer
if (str(status_code)[0] == "2"):
Answer.update({"status": "ok"})
else:
Answer.update({"status": "error"})
# Или в виде текста
return Answer
###############################################################################
def DayWeek(CurDate: datetime.datetime = datetime.datetime.today()):
"Получить день недели (по умолчанию - сегодня)"
weekday = CurDate.weekday()
return weekday + 1 # Что бы было от 1 до 7
###############################################################################
def SaveFile(Filename, Content, IsBinary=False) -> str:
"""
Сохранить файл с заменой существующего файла.
* Filename - полный путь к файлу.
* Content - содержимое файла.
* IsBinary - является ли содержимое двоичным или нет.
return - пустота если всё хорошо или текст ошибки.
"""
Mode = 'w' # Текстовый
if (IsBinary == True):
Mode = 'wb' # Бинарный
try:
with open(Filename, Mode) as fp:
fp.write(Content)
fp.close()
return '' # Всё хорошо
except Exception as E:
return str(E)
###############################################################################
def NewGUID() -> str:
"Сгененрировать новый GUID"
return str(uuid.uuid4())