#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API с общими функциями и процедурами Дата последней оптимизации: 22.05.2022 Дата обновления: 11.01.2023 """ import subprocess import uuid # Генератор GUID import datetime # Работа с датой import time # работа с временем import os # Работа с файловой системой import requests # Работа с http/s # Для Криптографии import hmac import base64 import hashlib import socket # Для проверки доступности портов from app import app # чтение из config.py from flask import request # Для получения метаданных о запросе import sys # Подключаем свои библиотеки sys.path.append("app/source") import API_App appname = app.config['SHMNAME'] NameModule = "API_Common" ################################################################################ class DataSession: """Класс для передачи данных о сесссии""" def __init__(self): self.status: bool = False # Действует сессия или нет self.comment: str = "" self.sessionid: str = "" self.userid: str = "" self.username: str = "" self.stationid: str = "" ################################################################################ def HTML_Progress(Color, IsAnimated: bool, PCur: int, PMax: int,) -> str: """ Создание строки прогресса для HTML :param Color: error, warning, blue, info, success :param IsAnimated: True или False :param PCur: Любое целое число :param PMax: Любое целое число :return: str """ ProgressColor = "" if (Color.lower() == "red" or Color.lower() == "danger" or Color.lower() == "error"): ProgressColor = "bg-danger" if (Color.lower() == "yellow" or Color.lower() == "warning"): ProgressColor = "bg-warning" if (Color.lower() == "green" or Color.lower() == "success"): ProgressColor = "bg-success" if (Color.lower() == "info"): ProgressColor = "bg-info" if (IsAnimated == True): ProgressAnim = "progress-bar-animated" else: ProgressAnim = "" if (PMax > 0): PWidth = int(round(int(PCur) / int(PMax) * 100)) else: PWidth = 100 PCur = 100 return f"""
""" ################################################################################ def Now(Format: int = 0): """ Текущая дата и время: * 0 - в секундах (timestamp, unixtime, отчет с 01.01.1970): int * 1 - ГГГГММДД: str * 2 - ГГГГ-ММ-ДД: str * 10 - в миллисекундах (без точки, для уникальности файла или временной таблицы) (timestamp, unixtime, отчет с 01.01.1970): str * 102 - ГГГГ-ММ-ДД ЧЧ:ММ:СС: str """ if (Format == 0): return int(str(time.mktime(datetime.datetime.now().timetuple())).split('.')[0]) if (Format == 10): return str(str(time.mktime(datetime.datetime.now().timetuple())).replace(".", "")) if (Format == 1): return str(datetime.datetime.now().strftime("%Y%m%d")) if (Format == 2): return str(datetime.datetime.now().strftime("%Y-%m-%d")) if (Format == 102): return str(datetime.datetime.now()).split('.')[0] return "" ################################################################################ def SaveLog(Filename, Value, ShowTimeInLog=True): """Запись лога в конец""" CurTime = datetime.datetime.now() Today = CurTime.strftime("%Y-%m-%d") # Проверяем и создаем путь для логов if (os.path.exists("logs") == False): os.mkdir("logs") if (os.path.exists("logs") == False): SaveLog(NameModule, "Не удается создать папку logs!") return file = open("logs/" + Filename + "_" + str(Today) + ".log", "a") if (ShowTimeInLog == True): file.write(str(CurTime) + ": " + Value + "\r\n") else: file.write(Value + "\r\n") file.close() if (app.config['SHOWLOG'] == True or GetVariable('Debug') == "1"): print(str(CurTime) + ": " + Value) ############################################################################### def validate(date_text: str): """ Проверка что значение является датой. На выходе datetime.datetime :param date_text: Проверяемая дата :return datetime.datetime """ try: # Проверка что есть доли секунды if (date_text.find(".") < 0): # Нет return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S').strftime("%d.%m.%Y %H:%M:%S") else: # Есть return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f').strftime("%d.%m.%Y %H:%M:%S") except Exception: # as E: if (date_text == "None"): return "" else: return date_text # ############################################################################### def SQLEscape(Query: str) -> str: """ Экранирование одинарных кавычек для исключения SQL-инъеккции в переменных :param Query: часть запроса для экранирования """ return Query.replace("'", "''") ############################################################################### def ThreadVars_Message() -> str: """Полоска для сообщений на всех страницах""" return """""" ############################################################################### def UserHead(Vars): """ Чтение заголовка из файла block_head для шаблона :param Vars """ CurStr = "" if (os.path.exists("app/templates/block_head.htm") == True): file = open("app/templates/block_head.htm", "r") # SaveLog(NameModule, "Чтение заголовка") CurStr = file.read() file.close() CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName']) CurStr = CurStr.replace("{{Title}}", Vars['Title']) return CurStr ############################################################################### def UserHeader(SessionID: str, Vars: dict) -> str: """ Чтение заголовка из файла block_header для шаблона :param SessionID: ID-сессии :param Vars: Массив переменных """ CurDataSession: DataSession = CheckSession(SessionID, False) CurStr = "" if (os.path.exists("app/templates/block_header.htm") == True): file = open("app/templates/block_header.htm", "r") # SaveLog(NameModule, "Чтение заголовка") CurStr = file.read() file.close() CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0]) if (request.args.get("Name") is None): CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3]) else: CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/")) CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName']) CurStr = CurStr.replace("{{Title}}", Vars['Title']) CurStr = CurStr.replace("{{FIO}}", CurDataSession.username) Vars.update({"CurStr": CurStr}) Vars.update({"request": request}) return API_App.UserHeader(SessionID, Vars) ############################################################################### def UserErrorHeader(SessionID: str, Vars: dict) -> str: """ Чтение заголовка из файла block_error_header для шаблона :param SessionID: ID-сессии :param Vars: Массив переменных """ CurDataSession: DataSession = CheckSession(SessionID, False) CurStr = "" if (os.path.exists("app/templates/block_error_header.htm") == True): file = open("app/templates/block_error_header.htm", "r") # SaveLog(NameModule, "Чтение заголовка") CurStr = file.read() file.close() CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0]) if (request.args.get("Name") is None): CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3]) else: CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/")) CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName']) CurStr = CurStr.replace("{{Title}}", Vars['Title']) CurStr = CurStr.replace("{{FIO}}", CurDataSession.username) return CurStr ############################################################################### def Modals(Vars) -> str: """Чтение из файла block_modals для шаблона :param Vars: Массив переменных. Пока не используется. """ CurStr = "" if (os.path.exists("app/templates/block_modals.htm") == True): file = open("app/templates/block_modals.htm", "r") # SaveLog(NameModule, "Чтение заголовка") CurStr = file.read() file.close() return CurStr ############################################################################### def UpdateSession(SessionID: str): """ Процедура продляет существование сессии """ if (app.config['SHMNAME'] == 'kmsik'): return API_App.UpdateSession(SessionID) # Нестандартное обновление сессии # Если сессия не существует или уже просрочена, то ничего не делаем CurDataSession: DataSession = CheckSession(SessionID, False) if (CurDataSession.status == False): return if (CurDataSession.sessionid != SessionID): return # Продляем проверочную сессию DBEG = time.mktime(datetime.datetime.now().timetuple()) DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии # Продляем сессию еще на 15 минут SaveDataSession(SessionID, "endsession", str(DEND)) ############################################################################### def ReadDataSession(SessionID, Variable, IsByte=False): """ Считываем значение переменной сессии :param SessionID: Сессия :param Variable: Параметр :param IsByte: Результат будет двоичные данные или нет :return При нестандартной ситуации возвращает пустоту """ TempVar = "" # Проверяем и создаем путь для локальных сессий if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False): SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "") return "" if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False): SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "/sessions") return "" # Проверяем наличие сессии CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID if (os.path.exists(CurPath) == True): if (os.path.exists(CurPath + "/" + Variable) == True): if (IsByte == True): file = open(CurPath + "/" + Variable, "rb") else: file = open(CurPath + "/" + Variable, "r") TempVar = file.read() file.close() return TempVar else: SaveLog(NameModule, "ReadDataSession: Переменная не существует: " + CurPath + "/" + Variable) else: SaveLog(NameModule, "ReadDataSession: Путь не существует: " + CurPath) return "" ############################################################################### def SaveDataSession(SessionID, Variable, Value, IsByte=False) -> bool: """ Сохраняем переменную сессии :param SessionID: Сессия :param Variable: Параметр :param Value: Сохраняемое значение :param IsByte: Сохранить бинарно или нет :return При нестандартной ситуации возвращает False """ # Проверяем и создаем путь для локальных сессий if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False): return False if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False): return False # Проверяем наличие сессии CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID if (os.path.exists(CurPath) == True): if (IsByte == False): file = open(CurPath + "/" + Variable, "w") else: file = open(CurPath + "/" + Variable, "wb") file.write(Value) file.close() return True return False ############################################################################### def CheckSession(SessionID, FlagNeedUpdate) -> DataSession: """ Процедура проверяет существование сессии :param SessionID: Сессия :param FlagNeedUpdate: Требуется ли обновление сессии при ее проверке :return При существовании сессии .status = True """ CurDataSession = DataSession() # На пустую сессию ничего не возвращать if (str(SessionID) == "" or SessionID is None): CurDataSession.comment = "Не передан SessionID" return CurDataSession # Текущее время в секундах DBEG = time.mktime(datetime.datetime.now().timetuple()) CurTime = datetime.datetime.now() # Проверяем и создаем путь для локальных сессий if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}") if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False): CurDataSession.comment = "Не удается создать каталог для сессии" return CurDataSession # Проверяем наличие сессии CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID if (os.path.exists(CurPath) == True): EndSession = ReadDataSession(SessionID, "endsession") # Если нет информации об окончании сессии пробуем еще раз if (EndSession == ""): time.sleep(1) EndSession = ReadDataSession(SessionID, "endsession") # Если так нет информации об окончании сессии то пишем ошибку if (EndSession == ""): SaveLog(NameModule, "CheckSession: Нет информации об окончании сессии: " + SessionID + ".") CurDataSession.status = False CurDataSession.comment = "Нет информации об окончании сессии" return CurDataSession # Если сессия уже истекла, то удаляем её if (float(DBEG) > float(EndSession)): SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.") DeleteSession(SessionID) CurDataSession.comment = "Сессия истекла" return CurDataSession # Если всё ОК то заполняем данные CurDataSession.userid = str(ReadDataSession(SessionID, "userid")) CurDataSession.username = str(ReadDataSession(SessionID, "username")) CurDataSession.stationid = str(ReadDataSession(SessionID, "stationid")) CurDataSession.sessionid = SessionID # Продлеваем если необходимо if (FlagNeedUpdate == True): UpdateSession(SessionID) CurDataSession.status = True CurDataSession.comment = "Сессия существует" return CurDataSession # возвращаем данные сессии CurDataSession.comment = "Сессия не существует" return CurDataSession # возвращаем данные сессии ############################################################################### def CreateSession(UserID: str, UserName: str, StationID: str = "", SessionID: str = "", IPClient: str = ""): """Процедура создает сессию, после успешного определения авторизации :param UserID :param UserName :param StationID: Не обязательный параметр :param SessionID: Не обязательный параметр :param IPClient: Не обязательный параметр """ if (app.config['SHMNAME'] == 'kmsik'): return API_App.CreateSession(UserID, UserName, StationID, SessionID, IPClient) # Для КМС-ИК # Удаление старых сессии DeleteExpiredSession() # защита от создания сессии на пустой ID if (UserID == "" or UserID == "0"): return "" # Переменные DBEG = time.mktime(datetime.datetime.now().timetuple()) DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии CurTime = datetime.datetime.now() if (SessionID == ""): SessionID = "S_" + str(uuid.uuid4()) SaveLog(NameModule, str(CurTime) + ": Создание сессии " + SessionID) # Проверяем и создаем путь для локальных сессий if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}") if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions") # создаем файл для сессии CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID os.mkdir(CurPath) # Сохраняем данные SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID) SaveDataSession(SessionID, "ipclient", str(IPClient)) SaveDataSession(SessionID, "userid", str(UserID)) SaveDataSession(SessionID, "username", str(UserName)) SaveDataSession(SessionID, "startsession", str(DEND)) SaveDataSession(SessionID, "endsession", str(DEND)) SaveDataSession(SessionID, "stationid", str(StationID)) SaveLog(NameModule, "Успешный вход в систему: " + UserName) return SessionID # возвращаем ID сессии ############################################################################### def CreateSessionVK(UserID, UserName): """ Процедура создает сессию, после успешного определения авторизации :param UserID: :param UserName: """ # Удаление старых сессии DeleteExpiredSession() # защита от создания сессии на пустой ID if (UserID == "" or UserID == "0"): return "" # Переменные DBEG = time.mktime(datetime.datetime.now().timetuple()) DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии SessionID = str(uuid.uuid4()) CurTime = datetime.datetime.now() SaveLog(NameModule, str(CurTime) + ": Создание сессии ВК " + SessionID) # Проверяем и создаем путь для локальных сессий if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}") if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions") # создаем файл для сессии CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID os.mkdir(CurPath) # Сохраняем данные SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID) SaveDataSession(SessionID, "userid", str(UserID)) SaveDataSession(SessionID, "username", str(UserName)) SaveDataSession(SessionID, "startsession", str(DEND)) SaveDataSession(SessionID, "endsession", str(DEND)) SaveDataSession(SessionID, "fromvk", '1') SaveLog(NameModule, "Успешный вход в систему из под ВК: " + UserName) return SessionID # возвращаем ID сессии ############################################################################### def DeleteExpiredSession(): """Процедура удаляет из базы истекшие сессии""" # Текущее время в секундах DBEG = time.mktime(datetime.datetime.now().timetuple()) CurTime = datetime.datetime.now() SaveLog(NameModule, str(CurTime) + ": Запуск удаления старых сессии ") # Проверяем и создаем путь для локальных сессий if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}") if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions") # Получаем каталог сессии for SessionID in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions"): CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID # проверяем не файл ли это if (os.path.isfile(CurPath) == False): # проверяем срок сессии EndSession = ReadDataSession(SessionID, "endsession") if (EndSession == ""): DeleteSession(SessionID) else: # Если сессия уже истекла, то удаляем её if (float(DBEG) > float(EndSession)): SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.") DeleteSession(SessionID) ############################################################################### def DeleteSession(SessionID): """Процедура удаляет из базы конкретную сессию""" CurTime = datetime.datetime.now() if (SessionID == ""): SaveLog(NameModule, str(CurTime) + ": Сессия для удаления не указана!") return False SaveLog(NameModule, str(CurTime) + ": Удаление сессии " + SessionID) # Проверяем и создаем путь для локальных сессий if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}") if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False): os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions") # создаем файл для сессии if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID) == True): # Удаляем все файлы сессии for f in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID): os.remove(f"/dev/shm/{app.config['SHMNAME']}/sessions/{SessionID}/{f}") # Удаляем папку сессии os.rmdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID) ############################################################################### def GetVariable(Variable: str) -> str: """ Считываем настройку программы :param Variable: имя переменной """ TempVar = "" # Проверяем наличие сессии CurPath = "db/Settings/" # Если работает с Виртуальной памятью, то ищем настройки там if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/Settings") == True): CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/" if (os.path.exists(CurPath) == True): if (os.path.exists(CurPath + "/" + Variable) == True): file = open(CurPath + "/" + Variable, "r") TempVar = file.read() file.close() return TempVar.strip() return "" ############################################################################### def SaveVariable(Variable, Value): """ Сохраняем переменную от настроек программы :param Variable: Параметр :param Value: Сохраняемое значение """ # Проверяем пусть if (os.path.exists("db") == False): return "" if (os.path.exists("db/Settings") == False): return "" # Сохраняем настройки в программе CurPath = "db/Settings/" if (os.path.exists(CurPath) == True): file = open(CurPath + "/" + Variable, "w") file.write(str(Value)) file.close() # Сохраняем настройки в памяти CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/" if (os.path.exists(CurPath) == True): file = open(CurPath + "/" + Variable, "w") file.write(str(Value)) file.close() return True return False ############################################################################### def GetSHA512_FromFile(Filename: str, secret: str) -> str: """ Получить хэш на основе секретного ключа :param Filename: путь к файлу :param secret: """ if (os.path.exists(Filename) == False): return "" # Расчет хэша file = open(Filename, "rb") CurStr = file.read() file.close() return GetSHA512_File(CurStr, secret) ############################################################################### def GetSHA512_File(message: bytes, secret: str) -> str: """ Получить хэш из массива байтов на основе секретного ключа :param message: bytes :param secret """ h = hmac.new(bytearray(secret, "UTF-8"), message, hashlib.sha512) return base64.b64encode(h.digest()).decode() ############################################################################### def GetSHA512_Text(message: str, secret: str) -> str: """Получить хэш из текста на основе секретного ключа""" h = hmac.new(bytearray(secret, "UTF-8"), bytearray(message, "UTF-8"), hashlib.sha512) return base64.b64encode(h.digest()).decode() ############################################################################### def GetSHA1_Text(message: str) -> str: """Получить хэш из текста на основе секретного ключа. Совместим с хэшем SQL 2005""" hash_object = hashlib.sha1(bytearray(message, "UTF-8")) hex_dig = hash_object.hexdigest() return hex_dig ############################################################################### def CheckPort(ip, port) -> str: """ Проверка порта на его открытие "OPEN" или "CLOSE" """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) ip = ip.split("\\")[0] Attempt = 0 LastStatus = "" SaveLog(NameModule, "Проверка порта на: " + ip + ":" + str(port)) # Проверяем трижды или до 1 успеха while (Attempt < 3): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) try: sock.connect((ip, int(port))) SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' открыт.') # conn.close() return "OPEN" except Exception as E: SaveLog(NameModule, "CheckPort: " + str(E)) # LastStatus = "ERROR" time.sleep(1) LastStatus = "CLOSE" Attempt = Attempt + 1 SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' закрыт.') return LastStatus ############################################################################### def RenameDataSession(SessionID, OldNameVar, NewNameVar) -> bool: """ Переименовываем переменную сессии """ # Проверяем и создаем путь для локальных сессий if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False): return False if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False): return False # Проверяем наличие сессии CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID if (os.path.exists(CurPath) == True): os.rename(CurPath + "/" + OldNameVar, CurPath + "/" + NewNameVar) return True return False ############################################################################### def RemoveFromFolder(folder, RemoveSelf=False) -> str: """ Удалить всё из указанной папки * RemoveSelf - Удалить саму папку (уже пустую) return: Текст ошибки """ print('Обработка папки на удаление : ' + folder) for filename in os.listdir(folder): # Получаем список файлов в папке file_path = os.path.join(folder, filename) # Полный путь к файлу или папке try: if os.path.isfile(file_path): # Если это файл print('Удаляем файл: ' + file_path) os.remove(file_path) # Удалить elif os.path.isdir(file_path): # Если это папка RemoveFromFolder(file_path) # Удаляем содержимое этой папки (рекурсия) print('Удаляем папку : ' + file_path) os.rmdir(file_path) # А затем удалить саму эту папку except Exception as e: TextLog = f'Не удалось удалить {file_path}. Причина: {e}' SaveLog(NameModule, TextLog) return TextLog if (RemoveSelf == True): print('Удаляем папку : ' + folder) os.rmdir(folder) # А затем удалить саму эту папку print('Обработка папки на удаление завершена: ' + folder) return "" ############################################################################### def RemoveDataSession(SessionID, Variable): """ Удаляем переменную сессии """ # Проверяем и создаем путь для локальных сессий if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False): return False if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False): return False # Проверяем наличие сессии CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID if (os.path.exists(CurPath) == True): if (os.path.isfile(CurPath + "/" + Variable)): os.remove(CurPath + "/" + Variable) return True return False ######################################################################## def GetSHA1_FromFile(Filename): """Хэш для совместимости с MSSQL""" if (os.path.exists(Filename) == False): return "" # Расчет хэша file = open(Filename, "rb") CurStr = file.read() file.close() hash_object = hashlib.sha1(CurStr) hex_dig = hash_object.hexdigest() return hex_dig ######################################################################## def ConvertStrToDate(date_text: str) -> datetime.datetime: """ Проверка что строка является датой. Поддерживается ГГГГММДД, ГГГГ-ММ-ДД, ДД.ММ.ГГГГ. """ try: date_text = date_text.strip() # Если слишком мало текста для преобразования # Дата не указана полностью if (len(date_text) < 8 or len(date_text) == 9): return None # type: ignore # время не указано полностью if (len(date_text) > 10 and len(date_text) < 19): return None # type: ignore # не 1-х и не 2-х тысячалетие if (len(date_text) == 8 and date_text[:1] not in ('1', '2')): return None # type: ignore # ГГГГММДД if (len(date_text) == 8): return datetime.datetime.strptime(date_text, '%Y%m%d') # Если точка идет 3-им символов (для формата ДД.ММ.ГГГГ ) if (date_text.find('.') == 2 or date_text.find('/') == 2 or date_text.find('-') == 2): date_text = f"{date_text[6:10]}-{date_text[3:5]}-{date_text[0:2]}" if (len(date_text) == 10): return datetime.datetime.strptime(date_text.replace(".", "-"), '%Y-%m-%d') # Проверка что есть доли секунды if (date_text.find(".") < 0): # Нет return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S') else: # Есть return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f') except Exception: if (date_text == "None"): return None # type: ignore else: return date_text # type: ignore ############################################################################### def ConvertDate(d: datetime.datetime, NewType=102) -> str: """ Преобразование даты в указанный формат 102 - ГГГГ-ММ-ДД 104 - ДД.ММ.ГГГГ 107 - январь 2023 108 - Январь 2023 109 - ЯНВАРЬ 2023 """ M = {} M.update({"01": "январь"}) M.update({"02": "февраль"}) M.update({"03": "март"}) M.update({"04": "апрель"}) M.update({"05": "май"}) M.update({"06": "июнь"}) M.update({"07": "июль"}) M.update({"08": "август"}) M.update({"09": "сентябрь"}) M.update({"10": "октябрь"}) M.update({"11": "ноябрь"}) M.update({"12": "декабрь"}) if NewType == 112: return d.strftime("%Y%m%d") if NewType == 102: return d.strftime("%Y-%m-%d") if NewType == 104: return d.strftime("%d.%m.%Y") if NewType == 107: return M[d.strftime("%m")] + " " + d.strftime("%Y") if NewType == 108: return M[d.strftime("%m")].capitalize() + " " + d.strftime("%Y") if NewType == 109: return M[d.strftime("%m")].upper() + " " + d.strftime("%Y") return "" ############################################################################### def GetTZ(): """Получить текущий часовой пояс""" now = datetime.datetime.now() Z = str(now.replace(tzinfo=datetime.timezone.utc) - now.astimezone(datetime.timezone.utc)).split(':')[0] return int(Z) ############################################################################### def GetStatusService(ServiceName) -> str: # Проверка статуса Result = subprocess.run("systemctl -all | grep " + ServiceName, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) if (str(Result.returncode) == "1"): return "None" # Проверка статуса p = subprocess.Popen(["systemctl", "is-active", ServiceName], stdout=subprocess.PIPE) (output, err) = p.communicate() output = output.decode('utf-8') # print(output) return output.strip() ############################################################################### def CreateDBSettings(): """Создание базы и таблицы настроек""" # Проверяем и создаем путь для локальных сессий if (os.path.exists("db") == False): os.mkdir("db") if (os.path.exists("db/Settings") == False): os.mkdir("db/Settings") ############################################################################### def GetFromRequestsT(Url: str, Attempt=1, headers={}) -> str: """Получить ответ по HTTP/S ссылке""" if (headers == {}): headers = { 'Accept': '*/*', 'Accept-Language': 'ru,en;q=0.9', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36', } # Сайты-исключения где не надо проверять сертификат. CheckCert = True if (Url.find('opendata.digital.gov.ru') > 0): CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта # При работе с локальными адресами используем свой сертификат if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0): CheckCert = "app/static/CA" # Отправляем GET-запрос на URL response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert) # Получаем статус-код ответа status_code = response.status_code if (str(status_code)[0] == "5"): # Если получен код ошибки 502, повторяем запрос до 3 раз if (Attempt <= 3): return GetFromRequestsT(Url, Attempt + 1) else: return "" # Возвращаем тело ответа в виде строки return response.text ############################################################################### def GetFromRequestsB(Url: str, Attempt=1, headers={}) -> bytes: """ Получить ответ по HTTP/S ссылке. Ответ в байтах. Например, скачаный двоичный файл. """ if (headers == {}): headers = { 'Accept': '*/*', 'Accept-Language': 'ru,en;q=0.9', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36', } # Сайты-исключения где не надо проверять сертификат. CheckCert = True if (Url.find('opendata.digital.gov.ru') > 0): CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта # При работе с локальными адресами используем свой сертификат if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0): CheckCert = "app/static/CA" # Отправляем GET-запрос на URL response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert) # Получаем статус-код ответа status_code = response.status_code if (str(status_code)[0] == "5"): # Если получен код ошибки 502, повторяем запрос до 3 раз if (Attempt <= 3): return GetFromRequestsB(Url, Attempt + 1) else: return None # Или в виде байтов return response.content ############################################################################### def PostFromRequestsB(Url: str, Data, Attempt=1, headers={}) -> bytes: """ Получить ответ по HTTP/S ссылке. Ответ в байтах. Например, скачаный двоичный файл. """ if (headers == {}): headers = { 'Accept': '*/*', 'Accept-Language': 'ru,en;q=0.9', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36', } # Сайты-исключения где не надо проверять сертификат. CheckCert = True if (Url.find('opendata.digital.gov.ru') > 0): CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта # При работе с локальными адресами используем свой сертификат if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0): CheckCert = "app/static/CA" # Отправляем POST-запрос на URL response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=Data) # Получаем статус-код ответа status_code = response.status_code if (str(status_code)[0] == "5"): # Если получен код ошибки 502, повторяем запрос до 3 раз if (Attempt <= 3): return PostFromRequestsB(Url, Data, Attempt + 1) else: return None # Или в виде байтов return response.content ############################################################################### def PostFromRequestsT(Url: str, data, Attempt=1, headers={}) -> str: """ Получить ответ по HTTP/S ссылке. Ответ в виде текста. """ if (headers == {}): headers = { 'Accept': '*/*', 'Accept-Language': 'ru,en;q=0.9', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36', } # Сайты-исключения где не надо проверять сертификат. CheckCert = True if (Url.find('opendata.digital.gov.ru') > 0): CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта # При работе с локальными адресами используем свой сертификат if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0): CheckCert = "app/static/CA" # Отправляем POST-запрос на URL response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=data) # Получаем статус-код ответа status_code = response.status_code if (str(status_code)[0] == "5"): # Если получен код ошибки 502, повторяем запрос до 3 раз if (Attempt <= 3): return PostFromRequestsT(Url, data, Attempt + 1) else: return "" # Или в виде текста return response.text ############################################################################### def DayWeek(CurDate: datetime.datetime = datetime.datetime.today()): "Получить день недели (по умолчанию - сегодня)" weekday = CurDate.weekday() return weekday + 1 # Что бы было от 1 до 7