#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API с общими функциями и процедурами
Дата последней оптимизации: 22.05.2022
Дата обновления: 26.01.2024
"""
import subprocess
import uuid # Генератор GUID
import datetime # Работа с датой
import time # работа с временем
import os # Работа с файловой системой
import requests # Работа с http/s
# Для Криптографии
import hmac
import base64
import hashlib
import socket # Для проверки доступности портов
from app import app # чтение из config.py
from flask import request # Для получения метаданных о запросе
import sys # Подключаем свои библиотеки
sys.path.append("app/source")
import API_App
appname = app.config['SHMNAME']
NameModule = "API_Common"
################################################################################
class DataSession:
"""Класс для передачи данных о сесссии"""
def __init__(self):
self.status: bool = False # Действует сессия или нет
self.comment: str = ""
self.sessionid: str = ""
self.userid: str = ""
self.username: str = ""
self.stationid: str = ""
################################################################################
def HTML_Progress(Color, IsAnimated: bool, PCur: int, PMax: int, MainDiv: bool = True) -> str:
"""
Создание строки прогресса для HTML
:param Color: error, warning, blue, info, success
:param IsAnimated: True или False
:param PCur: Любое целое число
:param PMax: Любое целое число
:return: str
"""
ProgressColor = ""
if (Color.lower() == "red" or Color.lower() == "danger" or Color.lower() == "error"):
ProgressColor = "bg-danger"
if (Color.lower() == "yellow" or Color.lower() == "warning"):
ProgressColor = "bg-warning"
if (Color.lower() == "green" or Color.lower() == "success"):
ProgressColor = "bg-success"
if (Color.lower() == "info"):
ProgressColor = "bg-info"
if (IsAnimated == True):
ProgressAnim = "progress-bar-animated"
else:
ProgressAnim = ""
if (PMax > 0):
PWidth = int(round(int(PCur) / int(PMax) * 100))
else:
PWidth = 100
PCur = 100
HTML = """
"""
if (MainDiv == True):
HTML += """
"""
return HTML
################################################################################
def Now(Format: int = 0):
"""
Текущая дата и время:
* 0 - в секундах (timestamp, unixtime, отчет с 01.01.1970): int
* 1 - ГГГГММДД: str
* 2 - ГГГГ-ММ-ДД: str
* 10 - в миллисекундах (без точки, для уникальности файла или временной таблицы) (timestamp, unixtime, отчет с 01.01.1970): str
* 102 - ГГГГ-ММ-ДД ЧЧ:ММ:СС: str
"""
if (Format == 0):
return int(str(time.mktime(datetime.datetime.now().timetuple())).split('.')[0])
if (Format == 10):
return str(str(time.mktime(datetime.datetime.now().timetuple())).replace(".", ""))
if (Format == 1):
return str(datetime.datetime.now().strftime("%Y%m%d"))
if (Format == 2):
return str(datetime.datetime.now().strftime("%Y-%m-%d"))
if (Format == 102):
return str(datetime.datetime.now()).split('.')[0]
return ""
################################################################################
def SaveLog(Filename, Value, ShowTimeInLog=True):
"""Запись лога в конец"""
CurTime = datetime.datetime.now()
Today = CurTime.strftime("%Y-%m-%d")
# Проверяем и создаем путь для логов
if (os.path.exists("logs") == False):
os.mkdir("logs")
if (os.path.exists("logs") == False):
SaveLog(NameModule, "Не удается создать папку logs!")
return
file = open("logs/" + Filename + "_" + str(Today) + ".log", "a")
if (ShowTimeInLog == True):
file.write(str(CurTime) + ": " + Value + "\r\n")
else:
file.write(Value + "\r\n")
file.close()
if (app.config['SHOWLOG'] == True or GetVariable('Debug') == "1"):
print(str(CurTime) + ": " + Value)
###############################################################################
def validate(date_text: str):
"""
Проверка что значение является датой.
На выходе datetime.datetime
:param date_text: Проверяемая дата
:return datetime.datetime
"""
try:
# Проверка что есть доли секунды
if (date_text.find(".") < 0):
# Нет
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S').strftime("%d.%m.%Y %H:%M:%S")
else:
# Есть
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f').strftime("%d.%m.%Y %H:%M:%S")
except Exception: # as E:
if (date_text == "None"):
return ""
else:
return date_text #
###############################################################################
def SQLEscape(Query: str) -> str:
"""
Экранирование одинарных кавычек для исключения SQL-инъеккции в переменных
:param Query: часть запроса для экранирования
"""
return Query.replace("'", "''")
###############################################################################
def ThreadVars_Message() -> str:
"""Полоска для сообщений на всех страницах"""
return """{{ThreadVars.message}}
"""
###############################################################################
def UserHead(Vars):
"""
Чтение заголовка из файла block_head для шаблона
:param Vars
"""
CurStr = ""
if (os.path.exists("app/templates/block_head.htm") == True):
file = open("app/templates/block_head.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
return CurStr
###############################################################################
def UserHeader(SessionID: str, Vars: dict) -> str:
"""
Чтение заголовка из файла block_header для шаблона
:param SessionID: ID-сессии
:param Vars: Массив переменных
"""
CurDataSession: DataSession = CheckSession(SessionID, False)
CurStr = ""
if (os.path.exists("app/templates/block_header.htm") == True):
file = open("app/templates/block_header.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0])
if (request.args.get("Name") is None):
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3])
else:
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/"))
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
CurStr = CurStr.replace("{{FIO}}", CurDataSession.username)
Vars.update({"CurStr": CurStr})
Vars.update({"request": request})
return API_App.UserHeader(SessionID, Vars)
###############################################################################
def UserErrorHeader(SessionID: str, Vars: dict) -> str:
"""
Чтение заголовка из файла block_error_header для шаблона
:param SessionID: ID-сессии
:param Vars: Массив переменных
"""
CurDataSession: DataSession = CheckSession(SessionID, False)
CurStr = ""
if (os.path.exists("app/templates/block_error_header.htm") == True):
file = open("app/templates/block_error_header.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0])
if (request.args.get("Name") is None):
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3])
else:
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/"))
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
CurStr = CurStr.replace("{{FIO}}", CurDataSession.username)
return CurStr
###############################################################################
def Modals(Vars) -> str:
"""Чтение из файла block_modals для шаблона
:param Vars: Массив переменных. Пока не используется.
"""
CurStr = ""
if (os.path.exists("app/templates/block_modals.htm") == True):
file = open("app/templates/block_modals.htm", "r")
# SaveLog(NameModule, "Чтение заголовка")
CurStr = file.read()
file.close()
return CurStr
###############################################################################
def UpdateSession(SessionID: str):
"""
Процедура продляет существование сессии
"""
if (app.config['SHMNAME'] == 'kmsik'):
return API_App.UpdateSession(SessionID) # Нестандартное обновление сессии
# Если сессия не существует или уже просрочена, то ничего не делаем
CurDataSession: DataSession = CheckSession(SessionID, False)
if (CurDataSession.status == False):
return
if (CurDataSession.sessionid != SessionID):
return
# Продляем проверочную сессию
DBEG = time.mktime(datetime.datetime.now().timetuple())
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
# Продляем сессию еще на 15 минут
SaveDataSession(SessionID, "endsession", str(DEND))
###############################################################################
def ReadDataSession(SessionID, Variable, IsByte=False):
"""
Считываем значение переменной сессии
:param SessionID: Сессия
:param Variable: Параметр
:param IsByte: Результат будет двоичные данные или нет
:return При нестандартной ситуации возвращает пустоту
"""
TempVar = ""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "")
return ""
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "/sessions")
return ""
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
if (os.path.exists(CurPath + "/" + Variable) == True):
if (IsByte == True):
file = open(CurPath + "/" + Variable, "rb")
else:
file = open(CurPath + "/" + Variable, "r")
TempVar = file.read()
file.close()
return TempVar
else:
SaveLog(NameModule, "ReadDataSession: Переменная не существует: " + CurPath + "/" + Variable)
else:
SaveLog(NameModule, "ReadDataSession: Путь не существует: " + CurPath)
return ""
###############################################################################
def SaveDataSession(SessionID, Variable, Value, IsByte=False) -> bool:
"""
Сохраняем переменную сессии
:param SessionID: Сессия
:param Variable: Параметр
:param Value: Сохраняемое значение
:param IsByte: Сохранить бинарно или нет
:return При нестандартной ситуации возвращает False
"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
return False
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
return False
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
if (IsByte == False):
file = open(CurPath + "/" + Variable, "w")
else:
file = open(CurPath + "/" + Variable, "wb")
file.write(Value)
file.close()
return True
return False
###############################################################################
def CheckSession(SessionID, FlagNeedUpdate) -> DataSession:
"""
Процедура проверяет существование сессии
:param SessionID: Сессия
:param FlagNeedUpdate: Требуется ли обновление сессии при ее проверке
:return При существовании сессии .status = True
"""
CurDataSession = DataSession()
# На пустую сессию ничего не возвращать
if (str(SessionID) == "" or SessionID is None):
CurDataSession.comment = "Не передан SessionID"
return CurDataSession
# Текущее время в секундах
DBEG = time.mktime(datetime.datetime.now().timetuple())
CurTime = datetime.datetime.now()
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
CurDataSession.comment = "Не удается создать каталог для сессии"
return CurDataSession
# Проверяем наличие сессии
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
EndSession = ReadDataSession(SessionID, "endsession")
# Если нет информации об окончании сессии пробуем еще раз
if (EndSession == ""):
time.sleep(1)
EndSession = ReadDataSession(SessionID, "endsession")
# Если так нет информации об окончании сессии то пишем ошибку
if (EndSession == ""):
SaveLog(NameModule, "CheckSession: Нет информации об окончании сессии: " + SessionID + ".")
CurDataSession.status = False
CurDataSession.comment = "Нет информации об окончании сессии"
return CurDataSession
# Если сессия уже истекла, то удаляем её
if (float(DBEG) > float(EndSession)):
SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.")
DeleteSession(SessionID)
CurDataSession.comment = "Сессия истекла"
return CurDataSession
# Если всё ОК то заполняем данные
CurDataSession.userid = str(ReadDataSession(SessionID, "userid"))
CurDataSession.username = str(ReadDataSession(SessionID, "username"))
CurDataSession.stationid = str(ReadDataSession(SessionID, "stationid"))
CurDataSession.sessionid = SessionID
# Продлеваем если необходимо
if (FlagNeedUpdate == True):
UpdateSession(SessionID)
CurDataSession.status = True
CurDataSession.comment = "Сессия существует"
return CurDataSession # возвращаем данные сессии
CurDataSession.comment = "Сессия не существует"
return CurDataSession # возвращаем данные сессии
###############################################################################
def CreateSession(UserID: str, UserName: str, StationID: str = "", SessionID: str = "", IPClient: str = ""):
"""Процедура создает сессию, после успешного определения авторизации
:param UserID
:param UserName
:param StationID: Не обязательный параметр
:param SessionID: Не обязательный параметр
:param IPClient: Не обязательный параметр
"""
if (app.config['SHMNAME'] == 'kmsik'):
return API_App.CreateSession(UserID, UserName, StationID, SessionID, IPClient) # Для КМС-ИК
# Удаление старых сессии
DeleteExpiredSession()
# защита от создания сессии на пустой ID
if (UserID == "" or UserID == "0"):
return ""
# Переменные
DBEG = time.mktime(datetime.datetime.now().timetuple())
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
CurTime = datetime.datetime.now()
if (SessionID == ""):
SessionID = "S_" + str(uuid.uuid4())
SaveLog(NameModule, str(CurTime) + ": Создание сессии " + SessionID)
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# создаем файл для сессии
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
os.mkdir(CurPath)
# Сохраняем данные
SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID)
SaveDataSession(SessionID, "ipclient", str(IPClient))
SaveDataSession(SessionID, "userid", str(UserID))
SaveDataSession(SessionID, "username", str(UserName))
SaveDataSession(SessionID, "startsession", str(DEND))
SaveDataSession(SessionID, "endsession", str(DEND))
SaveDataSession(SessionID, "stationid", str(StationID))
SaveLog(NameModule, "Успешный вход в систему: " + UserName)
return SessionID # возвращаем ID сессии
###############################################################################
def CreateSessionVK(UserID, UserName):
"""
Процедура создает сессию, после успешного определения авторизации
:param UserID:
:param UserName:
"""
# Удаление старых сессии
DeleteExpiredSession()
# защита от создания сессии на пустой ID
if (UserID == "" or UserID == "0"):
return ""
# Переменные
DBEG = time.mktime(datetime.datetime.now().timetuple())
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
SessionID = str(uuid.uuid4())
CurTime = datetime.datetime.now()
SaveLog(NameModule, str(CurTime) + ": Создание сессии ВК " + SessionID)
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# создаем файл для сессии
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
os.mkdir(CurPath)
# Сохраняем данные
SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID)
SaveDataSession(SessionID, "userid", str(UserID))
SaveDataSession(SessionID, "username", str(UserName))
SaveDataSession(SessionID, "startsession", str(DEND))
SaveDataSession(SessionID, "endsession", str(DEND))
SaveDataSession(SessionID, "fromvk", '1')
SaveLog(NameModule, "Успешный вход в систему из под ВК: " + UserName)
return SessionID # возвращаем ID сессии
###############################################################################
def DeleteExpiredSession():
"""Процедура удаляет из базы истекшие сессии"""
# Текущее время в секундах
DBEG = time.mktime(datetime.datetime.now().timetuple())
CurTime = datetime.datetime.now()
SaveLog(NameModule, str(CurTime) + ": Запуск удаления старых сессии ")
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# Получаем каталог сессии
for SessionID in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions"):
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
# проверяем не файл ли это
if (os.path.isfile(CurPath) == False):
# проверяем срок сессии
EndSession = ReadDataSession(SessionID, "endsession")
if (EndSession == ""):
DeleteSession(SessionID)
else:
# Если сессия уже истекла, то удаляем её
if (float(DBEG) > float(EndSession)):
SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.")
DeleteSession(SessionID)
###############################################################################
def DeleteSession(SessionID):
"""Процедура удаляет из базы конкретную сессию"""
CurTime = datetime.datetime.now()
if (SessionID == ""):
SaveLog(NameModule, str(CurTime) + ": Сессия для удаления не указана!")
return False
SaveLog(NameModule, str(CurTime) + ": Удаление сессии " + SessionID)
# Проверяем и создаем путь для локальных сессий
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
# создаем файл для сессии
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID) == True):
# Удаляем все файлы сессии
for f in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID):
os.remove(f"/dev/shm/{app.config['SHMNAME']}/sessions/{SessionID}/{f}")
# Удаляем папку сессии
os.rmdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID)
###############################################################################
def GetVariable(Variable: str) -> str:
"""
Считываем настройку программы
:param Variable: имя переменной
"""
TempVar = ""
# Проверяем наличие сессии
CurPath = "db/Settings/"
# Если работает с Виртуальной памятью, то ищем настройки там
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/Settings") == True):
CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/"
if (os.path.exists(CurPath) == True):
if (os.path.exists(CurPath + "/" + Variable) == True):
file = open(CurPath + "/" + Variable, "r")
TempVar = file.read()
file.close()
return TempVar.strip()
return ""
###############################################################################
def SaveVariable(Variable, Value):
"""
Сохраняем переменную от настроек программы
:param Variable: Параметр
:param Value: Сохраняемое значение
"""
# Проверяем пусть
if (os.path.exists("db") == False):
return ""
if (os.path.exists("db/Settings") == False):
return ""
# Сохраняем настройки в программе
CurPath = "db/Settings/"
if (os.path.exists(CurPath) == True):
file = open(CurPath + "/" + Variable, "w")
file.write(str(Value))
file.close()
# Сохраняем настройки в памяти
CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/"
if (os.path.exists(CurPath) == True):
file = open(CurPath + "/" + Variable, "w")
file.write(str(Value))
file.close()
return True
return False
###############################################################################
def GetSHA512_FromFile(Filename: str, secret: str) -> str:
"""
Получить хэш на основе секретного ключа
:param Filename: путь к файлу
:param secret:
"""
if (os.path.exists(Filename) == False):
return ""
# Расчет хэша
file = open(Filename, "rb")
CurStr = file.read()
file.close()
return GetSHA512_File(CurStr, secret)
###############################################################################
def GetSHA512_File(message: bytes, secret: str) -> str:
"""
Получить хэш из массива байтов на основе секретного ключа
:param message: bytes
:param secret
"""
h = hmac.new(bytearray(secret, "UTF-8"), message, hashlib.sha512)
return base64.b64encode(h.digest()).decode()
###############################################################################
def GetSHA512_Text(message: str, secret: str) -> str:
"""Получить хэш из текста на основе секретного ключа"""
h = hmac.new(bytearray(secret, "UTF-8"), bytearray(message, "UTF-8"), hashlib.sha512)
return base64.b64encode(h.digest()).decode()
###############################################################################
def GetSHA1_Text(message: str) -> str:
"""Получить хэш из текста на основе секретного ключа. Совместим с хэшем SQL 2005"""
hash_object = hashlib.sha1(bytearray(message, "UTF-8"))
hex_dig = hash_object.hexdigest()
return hex_dig
###############################################################################
def CheckPort(ip, port) -> str:
"""
Проверка порта на его открытие
"OPEN" или "CLOSE"
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
ip = ip.split("\\")[0]
Attempt = 0
LastStatus = ""
SaveLog(NameModule, "Проверка порта на: " + ip + ":" + str(port))
# Проверяем трижды или до 1 успеха
while (Attempt < 3):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
try:
sock.connect((ip, int(port)))
SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' открыт.')
# conn.close()
return "OPEN"
except Exception as E:
SaveLog(NameModule, "CheckPort: " + str(E))
# LastStatus = "ERROR"
time.sleep(1)
LastStatus = "CLOSE"
Attempt = Attempt + 1
SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' закрыт.')
return LastStatus
###############################################################################
def RenameDataSession(SessionID, OldNameVar, NewNameVar) -> bool:
"""
Переименовываем переменную сессии
"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
return False
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
return False
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
os.rename(CurPath + "/" + OldNameVar, CurPath + "/" + NewNameVar)
return True
return False
###############################################################################
def RemoveFromFolder(folder, RemoveSelf=False) -> str:
"""
Удалить всё из указанной папки
* RemoveSelf - Удалить саму папку (уже пустую)
return: Текст ошибки
"""
print('Обработка папки на удаление : ' + folder)
for filename in os.listdir(folder): # Получаем список файлов в папке
file_path = os.path.join(folder, filename) # Полный путь к файлу или папке
try:
if os.path.isfile(file_path): # Если это файл
print('Удаляем файл: ' + file_path)
os.remove(file_path) # Удалить
elif os.path.isdir(file_path): # Если это папка
RemoveFromFolder(file_path) # Удаляем содержимое этой папки (рекурсия)
print('Удаляем папку : ' + file_path)
os.rmdir(file_path) # А затем удалить саму эту папку
except Exception as e:
TextLog = f'Не удалось удалить {file_path}. Причина: {e}'
SaveLog(NameModule, TextLog)
return TextLog
if (RemoveSelf == True):
print('Удаляем папку : ' + folder)
os.rmdir(folder) # А затем удалить саму эту папку
print('Обработка папки на удаление завершена: ' + folder)
return ""
###############################################################################
def RemoveDataSession(SessionID, Variable):
"""
Удаляем переменную сессии
"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
return False
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
return False
# Проверяем наличие сессии
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
if (os.path.exists(CurPath) == True):
if (os.path.isfile(CurPath + "/" + Variable)):
os.remove(CurPath + "/" + Variable)
return True
return False
########################################################################
def GetSHA1_FromFile(Filename):
"""Хэш для совместимости с MSSQL"""
if (os.path.exists(Filename) == False):
return ""
# Расчет хэша
file = open(Filename, "rb")
CurStr = file.read()
file.close()
hash_object = hashlib.sha1(CurStr)
hex_dig = hash_object.hexdigest()
return hex_dig
########################################################################
def ConvertStrToDate(date_text: str) -> datetime.datetime:
"""
Проверка что строка является датой.
* Поддерживается ГГГГММДД, ГГГГ-ММ-ДД, ДД.ММ.ГГГГ.
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ:СС
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ:СС.мс
* Поддерживается с буквой T вместо пробела.
"""
try:
date_text = date_text.strip()
# Если слишком мало текста для преобразования
# Дата не указана полностью
if (len(date_text) < 8 or len(date_text) == 9):
return None # type: ignore
# нет секунд. Вставляем их.
# print("date_text: " + date_text)
# print(f"len date_text: {len(date_text)}")
# print(f"""date_text.find("T": {date_text.find("T")}""")
if (len(date_text) == 16 and (date_text.find("T") == 10)):
date_text = date_text + ":00"
print("date_text: " + date_text)
# время не указано полностью
if (len(date_text) > 10 and len(date_text) < 19):
return None # type: ignore
# не 1-х и не 2-х тысячалетие
if (len(date_text) == 8 and date_text[:1] not in ('1', '2')):
return None # type: ignore
# ГГГГММДД
if (len(date_text) == 8):
return datetime.datetime.strptime(date_text, '%Y%m%d')
# Если точка идет 3-им символов (для формата ДД.ММ.ГГГГ )
if (date_text.find('.') == 2 or date_text.find('/') == 2 or date_text.find('-') == 2):
date_text = f"{date_text[6:10]}-{date_text[3:5]}-{date_text[0:2]}"
if (len(date_text) == 10):
return datetime.datetime.strptime(date_text.replace(".", "-"), '%Y-%m-%d')
# Если время разделено буквой T
if (date_text.find("T") > 0):
if (date_text.find(".") < 0): # Проверка что есть доли секунды
# Нет
return datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S')
else:
# Есть
return datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S.%f')
else:
# Проверка что есть доли секунды
if (date_text.find(".") < 0):
# Нет
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S')
else:
# Есть
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f')
except Exception:
if (date_text == "None"):
return None # type: ignore
else:
return date_text # type: ignore
###############################################################################
def ConvertDate(d: datetime.datetime, NewType=102) -> str:
"""
Преобразование даты в указанный формат:
* 102 - ГГГГ-ММ-ДД
* 104 - ДД.ММ.ГГГГ
* 107 - январь 2023
* 108 - Январь 2023
* 109 - ЯНВАРЬ 2023
* 112 - ГГГГММДД
* 122 - ГГГГ-ММ-ДД ЧЧ:ММ:СС
"""
M = {}
M.update({"01": "январь"})
M.update({"02": "февраль"})
M.update({"03": "март"})
M.update({"04": "апрель"})
M.update({"05": "май"})
M.update({"06": "июнь"})
M.update({"07": "июль"})
M.update({"08": "август"})
M.update({"09": "сентябрь"})
M.update({"10": "октябрь"})
M.update({"11": "ноябрь"})
M.update({"12": "декабрь"})
if NewType == 102:
return d.strftime("%Y-%m-%d")
if NewType == 104:
return d.strftime("%d.%m.%Y")
if NewType == 107:
return M[d.strftime("%m")] + " " + d.strftime("%Y")
if NewType == 108:
return M[d.strftime("%m")].capitalize() + " " + d.strftime("%Y")
if NewType == 109:
return M[d.strftime("%m")].upper() + " " + d.strftime("%Y")
if NewType == 112:
return d.strftime("%Y%m%d")
if NewType == 122:
return d.strftime('%Y-%m-%d %H:%M:%S')
return ""
###############################################################################
def GetTZ():
"""Получить текущий часовой пояс"""
now = datetime.datetime.now()
Z = str(now.replace(tzinfo=datetime.timezone.utc) - now.astimezone(datetime.timezone.utc)).split(':')[0]
return int(Z)
###############################################################################
def GetStatusService(ServiceName) -> str:
# Проверка статуса
Result = subprocess.run("systemctl -all | grep " + ServiceName, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if (str(Result.returncode) == "1"):
return "None"
# Проверка статуса
p = subprocess.Popen(["systemctl", "is-active", ServiceName], stdout=subprocess.PIPE)
(output, err) = p.communicate()
output = output.decode('utf-8')
# print(output)
return output.strip()
###############################################################################
def CreateDBSettings():
"""Создание базы и таблицы настроек"""
# Проверяем и создаем путь для локальных сессий
if (os.path.exists("db") == False):
os.mkdir("db")
if (os.path.exists("db/Settings") == False):
os.mkdir("db/Settings")
###############################################################################
def GetFromRequestsT(Url: str, Attempt=1, headers={}) -> str:
"""Получить ответ по HTTP/S ссылке"""
if (headers == {}):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем GET-запрос на URL
response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert)
# Получаем статус-код ответа
status_code = response.status_code
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return GetFromRequestsT(Url, Attempt + 1)
else:
return ""
# Возвращаем тело ответа в виде строки
return response.text
###############################################################################
def GetFromRequestsB(Url: str, Attempt=1, headers={}) -> bytes:
"""
Получить ответ по HTTP/S ссылке.
Ответ в байтах. Например, скачаный двоичный файл.
"""
if (headers == {}):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем GET-запрос на URL
response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert)
# Получаем статус-код ответа
status_code = response.status_code
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return GetFromRequestsB(Url, Attempt + 1)
else:
return None # type: ignore
# Или в виде байтов
return response.content
###############################################################################
def PostFromRequestsB(Url: str, Data, Attempt=1, headers=None) -> bytes:
"""
Получить ответ по HTTP/S ссылке.
Ответ в байтах. Например, скачаный двоичный файл.
"""
if (headers is None):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем POST-запрос на URL
response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=Data)
# Получаем статус-код ответа
status_code = response.status_code
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return PostFromRequestsB(Url, Data, Attempt + 1)
else:
return None
# Или в виде байтов
return response.content
###############################################################################
def PostFromRequestsT(Url: str, data, Attempt=1, headers=None) -> dict:
"""
Получить ответ по HTTP/S ссылке.
Ответ в виде текста.
"""
Answer = {}
if (headers is None):
headers = {
'Accept': '*/*',
'Accept-Language': 'ru,en;q=0.9',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
}
# Сайты-исключения где не надо проверять сертификат.
CheckCert = True
# TODO: Узнать бы какой сертификат тут используется.
if (Url.find('opendata.digital.gov.ru') > 0):
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
# При работе с локальными адресами используем свой сертификат
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
CheckCert = "app/static/CA"
# Отправляем POST-запрос на URL
response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=data)
# Получаем статус-код ответа
status_code = response.status_code
Answer.update({"code": str(status_code)})
Answer.update({"text": response.text})
if (str(status_code)[0] == "5"):
# Если получен код ошибки 502, повторяем запрос до 3 раз
if (Attempt <= 3):
return PostFromRequestsT(Url, data, Attempt + 1)
else:
Answer.update({"status": "error"})
return Answer
if (str(status_code)[0] == "2"):
Answer.update({"status": "ok"})
else:
Answer.update({"status": "error"})
# Или в виде текста
return Answer
###############################################################################
def DayWeek(CurDate: datetime.datetime = datetime.datetime.today()):
"Получить день недели (по умолчанию - сегодня)"
weekday = CurDate.weekday()
return weekday + 1 # Что бы было от 1 до 7
###############################################################################
def SaveFile(Filename, Content, IsBinary=False) -> str:
"""
Сохранить файл с заменой существующего файла.
* Filename - полный путь к файлу.
* Content - содержимое файла.
* IsBinary - является ли содержимое двоичным или нет.
return - пустота если всё хорошо или текст ошибки.
"""
Mode = 'w' # Текстовый
if (IsBinary == True):
Mode = 'wb' # Бинарный
try:
with open(Filename, Mode) as fp:
fp.write(Content)
fp.close()
return '' # Всё хорошо
except Exception as E:
return str(E)
###############################################################################
def NewGUID() -> str:
"Сгененрировать новый GUID"
return str(uuid.uuid4())