1177 lines
47 KiB
Python
1177 lines
47 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
API с общими функциями и процедурами
|
||
Дата последней оптимизации: 22.05.2022
|
||
Дата обновления: 26.01.2024
|
||
"""
|
||
|
||
import subprocess
|
||
import uuid # Генератор GUID
|
||
import datetime # Работа с датой
|
||
import time # работа с временем
|
||
import os # Работа с файловой системой
|
||
import requests # Работа с http/s
|
||
|
||
# Для Криптографии
|
||
import hmac
|
||
import base64
|
||
import hashlib
|
||
|
||
import socket # Для проверки доступности портов
|
||
from app import app # чтение из config.py
|
||
from flask import request # Для получения метаданных о запросе
|
||
|
||
import sys # Подключаем свои библиотеки
|
||
sys.path.append("app/source")
|
||
import API_App
|
||
|
||
appname = app.config['SHMNAME']
|
||
NameModule = "API_Common"
|
||
|
||
|
||
################################################################################
|
||
class DataSession:
|
||
"""Класс для передачи данных о сесссии"""
|
||
|
||
def __init__(self):
|
||
self.status: bool = False # Действует сессия или нет
|
||
self.comment: str = ""
|
||
self.sessionid: str = ""
|
||
self.userid: str = ""
|
||
self.username: str = ""
|
||
self.stationid: str = ""
|
||
|
||
|
||
################################################################################
|
||
def HTML_Progress(Color, IsAnimated: bool, PCur: int, PMax: int, MainDiv: bool = True) -> str:
|
||
"""
|
||
Создание строки прогресса для HTML
|
||
:param Color: error, warning, blue, info, success
|
||
:param IsAnimated: True или False
|
||
:param PCur: Любое целое число
|
||
:param PMax: Любое целое число
|
||
:return: str
|
||
"""
|
||
ProgressColor = ""
|
||
|
||
if (Color.lower() == "red" or Color.lower() == "danger" or Color.lower() == "error"):
|
||
ProgressColor = "bg-danger"
|
||
|
||
if (Color.lower() == "yellow" or Color.lower() == "warning"):
|
||
ProgressColor = "bg-warning"
|
||
|
||
if (Color.lower() == "green" or Color.lower() == "success"):
|
||
ProgressColor = "bg-success"
|
||
|
||
if (Color.lower() == "info"):
|
||
ProgressColor = "bg-info"
|
||
|
||
if (IsAnimated == True):
|
||
ProgressAnim = "progress-bar-animated"
|
||
else:
|
||
ProgressAnim = ""
|
||
|
||
if (PMax > 0):
|
||
PWidth = int(round(int(PCur) / int(PMax) * 100))
|
||
else:
|
||
PWidth = 100
|
||
PCur = 100
|
||
|
||
HTML = """<div class="progress-bar progress-bar """
|
||
if (MainDiv == True):
|
||
HTML = """<div class="progress"><div class="progress-bar progress-bar-striped """
|
||
|
||
HTML += f"""{ProgressAnim} {ProgressColor}" role="progressbar"
|
||
style="width:{PWidth}%" aria-valuenow="{PCur}"
|
||
aria-valuemin="0" aria-valuemax="{PMax}"></div>"""
|
||
if (MainDiv == True):
|
||
HTML += """</div>"""
|
||
|
||
return HTML
|
||
|
||
|
||
################################################################################
|
||
def Now(Format: int = 0):
|
||
"""
|
||
Текущая дата и время:
|
||
* 0 - в секундах (timestamp, unixtime, отчет с 01.01.1970): int
|
||
* 1 - ГГГГММДД: str
|
||
* 2 - ГГГГ-ММ-ДД: str
|
||
* 10 - в миллисекундах (без точки, для уникальности файла или временной таблицы) (timestamp, unixtime, отчет с 01.01.1970): str
|
||
* 102 - ГГГГ-ММ-ДД ЧЧ:ММ:СС: str
|
||
"""
|
||
|
||
if (Format == 0):
|
||
return int(str(time.mktime(datetime.datetime.now().timetuple())).split('.')[0])
|
||
if (Format == 10):
|
||
return str(str(time.mktime(datetime.datetime.now().timetuple())).replace(".", ""))
|
||
if (Format == 1):
|
||
return str(datetime.datetime.now().strftime("%Y%m%d"))
|
||
if (Format == 2):
|
||
return str(datetime.datetime.now().strftime("%Y-%m-%d"))
|
||
if (Format == 102):
|
||
return str(datetime.datetime.now()).split('.')[0]
|
||
return ""
|
||
|
||
|
||
################################################################################
|
||
def SaveLog(Filename, Value, ShowTimeInLog=True):
|
||
"""Запись лога в конец"""
|
||
|
||
CurTime = datetime.datetime.now()
|
||
Today = CurTime.strftime("%Y-%m-%d")
|
||
|
||
# Проверяем и создаем путь для логов
|
||
if (os.path.exists("logs") == False):
|
||
os.mkdir("logs")
|
||
if (os.path.exists("logs") == False):
|
||
SaveLog(NameModule, "Не удается создать папку logs!")
|
||
return
|
||
|
||
file = open("logs/" + Filename + "_" + str(Today) + ".log", "a")
|
||
if (ShowTimeInLog == True):
|
||
file.write(str(CurTime) + ": " + Value + "\r\n")
|
||
else:
|
||
file.write(Value + "\r\n")
|
||
file.close()
|
||
|
||
if (app.config['SHOWLOG'] == True or GetVariable('Debug') == "1"):
|
||
print(str(CurTime) + ": " + Value)
|
||
|
||
|
||
###############################################################################
|
||
def validate(date_text: str):
|
||
"""
|
||
Проверка что значение является датой.
|
||
На выходе datetime.datetime
|
||
:param date_text: Проверяемая дата
|
||
:return datetime.datetime
|
||
"""
|
||
try:
|
||
# Проверка что есть доли секунды
|
||
if (date_text.find(".") < 0):
|
||
# Нет
|
||
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S').strftime("%d.%m.%Y %H:%M:%S")
|
||
else:
|
||
# Есть
|
||
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f').strftime("%d.%m.%Y %H:%M:%S")
|
||
except Exception: # as E:
|
||
if (date_text == "None"):
|
||
return ""
|
||
else:
|
||
return date_text #
|
||
|
||
|
||
###############################################################################
|
||
def SQLEscape(Query: str) -> str:
|
||
"""
|
||
Экранирование одинарных кавычек для исключения SQL-инъеккции в переменных
|
||
:param Query: часть запроса для экранирования
|
||
"""
|
||
return Query.replace("'", "''")
|
||
|
||
|
||
###############################################################################
|
||
def ThreadVars_Message() -> str:
|
||
"""Полоска для сообщений на всех страницах"""
|
||
|
||
return """<div v-if="ThreadVars.message!=''" class="alert" v-bind:class="{'alert-primary':M_Blue,'alert-danger':M_Red, 'alert-success':M_Green, 'alert-warning':M_Yellow}" role="alert" >{{ThreadVars.message}}</div>"""
|
||
|
||
|
||
###############################################################################
|
||
def UserHead(Vars):
|
||
"""
|
||
Чтение заголовка из файла block_head для шаблона
|
||
:param Vars
|
||
"""
|
||
CurStr = ""
|
||
|
||
if (os.path.exists("app/templates/block_head.htm") == True):
|
||
file = open("app/templates/block_head.htm", "r")
|
||
# SaveLog(NameModule, "Чтение заголовка")
|
||
CurStr = file.read()
|
||
file.close()
|
||
|
||
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
|
||
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
|
||
return CurStr
|
||
|
||
|
||
###############################################################################
|
||
def UserHeader(SessionID: str, Vars: dict) -> str:
|
||
"""
|
||
Чтение заголовка из файла block_header для шаблона
|
||
:param SessionID: ID-сессии
|
||
:param Vars: Массив переменных
|
||
"""
|
||
CurDataSession: DataSession = CheckSession(SessionID, False)
|
||
CurStr = ""
|
||
|
||
if (os.path.exists("app/templates/block_header.htm") == True):
|
||
file = open("app/templates/block_header.htm", "r")
|
||
# SaveLog(NameModule, "Чтение заголовка")
|
||
CurStr = file.read()
|
||
file.close()
|
||
|
||
CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0])
|
||
if (request.args.get("Name") is None):
|
||
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3])
|
||
else:
|
||
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/"))
|
||
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
|
||
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
|
||
CurStr = CurStr.replace("{{FIO}}", CurDataSession.username)
|
||
|
||
Vars.update({"CurStr": CurStr})
|
||
Vars.update({"request": request})
|
||
|
||
return API_App.UserHeader(SessionID, Vars)
|
||
|
||
|
||
###############################################################################
|
||
def UserErrorHeader(SessionID: str, Vars: dict) -> str:
|
||
"""
|
||
Чтение заголовка из файла block_error_header для шаблона
|
||
:param SessionID: ID-сессии
|
||
:param Vars: Массив переменных
|
||
"""
|
||
CurDataSession: DataSession = CheckSession(SessionID, False)
|
||
CurStr = ""
|
||
|
||
if (os.path.exists("app/templates/block_error_header.htm") == True):
|
||
file = open("app/templates/block_error_header.htm", "r")
|
||
# SaveLog(NameModule, "Чтение заголовка")
|
||
CurStr = file.read()
|
||
file.close()
|
||
|
||
CurStr = CurStr.replace("{{IPServer}}", request.host.split(":")[0])
|
||
if (request.args.get("Name") is None):
|
||
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3])
|
||
else:
|
||
CurStr = CurStr.replace("{{PageName}}", request.url.split('/')[3].replace("?Name=", "/"))
|
||
CurStr = CurStr.replace("{{ProgName}}", Vars['ProgName'])
|
||
CurStr = CurStr.replace("{{Title}}", Vars['Title'])
|
||
CurStr = CurStr.replace("{{FIO}}", CurDataSession.username)
|
||
return CurStr
|
||
|
||
|
||
###############################################################################
|
||
def Modals(Vars) -> str:
|
||
"""Чтение из файла block_modals для шаблона
|
||
:param Vars: Массив переменных. Пока не используется.
|
||
"""
|
||
CurStr = ""
|
||
|
||
if (os.path.exists("app/templates/block_modals.htm") == True):
|
||
file = open("app/templates/block_modals.htm", "r")
|
||
# SaveLog(NameModule, "Чтение заголовка")
|
||
CurStr = file.read()
|
||
file.close()
|
||
return CurStr
|
||
|
||
|
||
###############################################################################
|
||
def UpdateSession(SessionID: str):
|
||
"""
|
||
Процедура продляет существование сессии
|
||
"""
|
||
|
||
if (app.config['SHMNAME'] == 'kmsik'):
|
||
return API_App.UpdateSession(SessionID) # Нестандартное обновление сессии
|
||
|
||
# Если сессия не существует или уже просрочена, то ничего не делаем
|
||
CurDataSession: DataSession = CheckSession(SessionID, False)
|
||
if (CurDataSession.status == False):
|
||
return
|
||
|
||
if (CurDataSession.sessionid != SessionID):
|
||
return
|
||
|
||
# Продляем проверочную сессию
|
||
DBEG = time.mktime(datetime.datetime.now().timetuple())
|
||
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
|
||
|
||
# Продляем сессию еще на 15 минут
|
||
SaveDataSession(SessionID, "endsession", str(DEND))
|
||
|
||
|
||
###############################################################################
|
||
def ReadDataSession(SessionID, Variable, IsByte=False):
|
||
"""
|
||
Считываем значение переменной сессии
|
||
:param SessionID: Сессия
|
||
:param Variable: Параметр
|
||
:param IsByte: Результат будет двоичные данные или нет
|
||
:return При нестандартной ситуации возвращает пустоту
|
||
"""
|
||
|
||
TempVar = ""
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
|
||
SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "")
|
||
return ""
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
|
||
SaveLog(NameModule, "ReadDataSession: Путь не существует: /dev/shm/" + app.config['SHMNAME'] + "/sessions")
|
||
return ""
|
||
|
||
# Проверяем наличие сессии
|
||
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
|
||
if (os.path.exists(CurPath) == True):
|
||
if (os.path.exists(CurPath + "/" + Variable) == True):
|
||
if (IsByte == True):
|
||
file = open(CurPath + "/" + Variable, "rb")
|
||
else:
|
||
file = open(CurPath + "/" + Variable, "r")
|
||
TempVar = file.read()
|
||
file.close()
|
||
return TempVar
|
||
else:
|
||
SaveLog(NameModule, "ReadDataSession: Переменная не существует: " + CurPath + "/" + Variable)
|
||
else:
|
||
SaveLog(NameModule, "ReadDataSession: Путь не существует: " + CurPath)
|
||
return ""
|
||
|
||
|
||
###############################################################################
|
||
def SaveDataSession(SessionID, Variable, Value, IsByte=False) -> bool:
|
||
"""
|
||
Сохраняем переменную сессии
|
||
:param SessionID: Сессия
|
||
:param Variable: Параметр
|
||
:param Value: Сохраняемое значение
|
||
:param IsByte: Сохранить бинарно или нет
|
||
:return При нестандартной ситуации возвращает False
|
||
"""
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
|
||
return False
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
|
||
return False
|
||
|
||
# Проверяем наличие сессии
|
||
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
|
||
if (os.path.exists(CurPath) == True):
|
||
if (IsByte == False):
|
||
file = open(CurPath + "/" + Variable, "w")
|
||
else:
|
||
file = open(CurPath + "/" + Variable, "wb")
|
||
file.write(Value)
|
||
file.close()
|
||
return True
|
||
return False
|
||
|
||
|
||
###############################################################################
|
||
def CheckSession(SessionID, FlagNeedUpdate) -> DataSession:
|
||
"""
|
||
Процедура проверяет существование сессии
|
||
:param SessionID: Сессия
|
||
:param FlagNeedUpdate: Требуется ли обновление сессии при ее проверке
|
||
:return При существовании сессии .status = True
|
||
"""
|
||
CurDataSession = DataSession()
|
||
|
||
# На пустую сессию ничего не возвращать
|
||
if (str(SessionID) == "" or SessionID is None):
|
||
CurDataSession.comment = "Не передан SessionID"
|
||
return CurDataSession
|
||
|
||
# Текущее время в секундах
|
||
DBEG = time.mktime(datetime.datetime.now().timetuple())
|
||
CurTime = datetime.datetime.now()
|
||
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
|
||
CurDataSession.comment = "Не удается создать каталог для сессии"
|
||
return CurDataSession
|
||
|
||
# Проверяем наличие сессии
|
||
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
|
||
if (os.path.exists(CurPath) == True):
|
||
EndSession = ReadDataSession(SessionID, "endsession")
|
||
|
||
# Если нет информации об окончании сессии пробуем еще раз
|
||
if (EndSession == ""):
|
||
time.sleep(1)
|
||
EndSession = ReadDataSession(SessionID, "endsession")
|
||
|
||
# Если так нет информации об окончании сессии то пишем ошибку
|
||
if (EndSession == ""):
|
||
SaveLog(NameModule, "CheckSession: Нет информации об окончании сессии: " + SessionID + ".")
|
||
CurDataSession.status = False
|
||
CurDataSession.comment = "Нет информации об окончании сессии"
|
||
return CurDataSession
|
||
|
||
# Если сессия уже истекла, то удаляем её
|
||
if (float(DBEG) > float(EndSession)):
|
||
SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.")
|
||
DeleteSession(SessionID)
|
||
CurDataSession.comment = "Сессия истекла"
|
||
return CurDataSession
|
||
|
||
# Если всё ОК то заполняем данные
|
||
CurDataSession.userid = str(ReadDataSession(SessionID, "userid"))
|
||
CurDataSession.username = str(ReadDataSession(SessionID, "username"))
|
||
CurDataSession.stationid = str(ReadDataSession(SessionID, "stationid"))
|
||
CurDataSession.sessionid = SessionID
|
||
|
||
# Продлеваем если необходимо
|
||
if (FlagNeedUpdate == True):
|
||
UpdateSession(SessionID)
|
||
|
||
CurDataSession.status = True
|
||
CurDataSession.comment = "Сессия существует"
|
||
return CurDataSession # возвращаем данные сессии
|
||
|
||
CurDataSession.comment = "Сессия не существует"
|
||
return CurDataSession # возвращаем данные сессии
|
||
|
||
|
||
###############################################################################
|
||
def CreateSession(UserID: str, UserName: str, StationID: str = "", SessionID: str = "", IPClient: str = ""):
|
||
"""Процедура создает сессию, после успешного определения авторизации
|
||
:param UserID
|
||
:param UserName
|
||
:param StationID: Не обязательный параметр
|
||
:param SessionID: Не обязательный параметр
|
||
:param IPClient: Не обязательный параметр
|
||
"""
|
||
if (app.config['SHMNAME'] == 'kmsik'):
|
||
return API_App.CreateSession(UserID, UserName, StationID, SessionID, IPClient) # Для КМС-ИК
|
||
|
||
# Удаление старых сессии
|
||
DeleteExpiredSession()
|
||
|
||
# защита от создания сессии на пустой ID
|
||
if (UserID == "" or UserID == "0"):
|
||
return ""
|
||
|
||
# Переменные
|
||
DBEG = time.mktime(datetime.datetime.now().timetuple())
|
||
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
|
||
CurTime = datetime.datetime.now()
|
||
if (SessionID == ""):
|
||
SessionID = "S_" + str(uuid.uuid4())
|
||
SaveLog(NameModule, str(CurTime) + ": Создание сессии " + SessionID)
|
||
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
|
||
|
||
# создаем файл для сессии
|
||
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
|
||
os.mkdir(CurPath)
|
||
|
||
# Сохраняем данные
|
||
SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID)
|
||
SaveDataSession(SessionID, "ipclient", str(IPClient))
|
||
SaveDataSession(SessionID, "userid", str(UserID))
|
||
SaveDataSession(SessionID, "username", str(UserName))
|
||
SaveDataSession(SessionID, "startsession", str(DEND))
|
||
SaveDataSession(SessionID, "endsession", str(DEND))
|
||
SaveDataSession(SessionID, "stationid", str(StationID))
|
||
|
||
SaveLog(NameModule, "Успешный вход в систему: " + UserName)
|
||
return SessionID # возвращаем ID сессии
|
||
|
||
|
||
###############################################################################
|
||
def CreateSessionVK(UserID, UserName):
|
||
"""
|
||
Процедура создает сессию, после успешного определения авторизации
|
||
:param UserID:
|
||
:param UserName:
|
||
"""
|
||
# Удаление старых сессии
|
||
DeleteExpiredSession()
|
||
|
||
# защита от создания сессии на пустой ID
|
||
if (UserID == "" or UserID == "0"):
|
||
return ""
|
||
|
||
# Переменные
|
||
DBEG = time.mktime(datetime.datetime.now().timetuple())
|
||
DEND = DBEG + int(app.config['MAX_TIME_SESSION']) # время сессии
|
||
SessionID = str(uuid.uuid4())
|
||
CurTime = datetime.datetime.now()
|
||
SaveLog(NameModule, str(CurTime) + ": Создание сессии ВК " + SessionID)
|
||
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
|
||
|
||
# создаем файл для сессии
|
||
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
|
||
os.mkdir(CurPath)
|
||
|
||
# Сохраняем данные
|
||
SaveLog(NameModule, str(CurTime) + ": Сохраняем данные сессии " + SessionID)
|
||
SaveDataSession(SessionID, "userid", str(UserID))
|
||
SaveDataSession(SessionID, "username", str(UserName))
|
||
SaveDataSession(SessionID, "startsession", str(DEND))
|
||
SaveDataSession(SessionID, "endsession", str(DEND))
|
||
SaveDataSession(SessionID, "fromvk", '1')
|
||
|
||
SaveLog(NameModule, "Успешный вход в систему из под ВК: " + UserName)
|
||
|
||
return SessionID # возвращаем ID сессии
|
||
|
||
|
||
###############################################################################
|
||
def DeleteExpiredSession():
|
||
"""Процедура удаляет из базы истекшие сессии"""
|
||
|
||
# Текущее время в секундах
|
||
DBEG = time.mktime(datetime.datetime.now().timetuple())
|
||
CurTime = datetime.datetime.now()
|
||
|
||
SaveLog(NameModule, str(CurTime) + ": Запуск удаления старых сессии ")
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
|
||
|
||
# Получаем каталог сессии
|
||
for SessionID in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions"):
|
||
CurPath = f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID
|
||
# проверяем не файл ли это
|
||
if (os.path.isfile(CurPath) == False):
|
||
# проверяем срок сессии
|
||
EndSession = ReadDataSession(SessionID, "endsession")
|
||
|
||
if (EndSession == ""):
|
||
DeleteSession(SessionID)
|
||
else:
|
||
# Если сессия уже истекла, то удаляем её
|
||
if (float(DBEG) > float(EndSession)):
|
||
SaveLog(NameModule, str(CurTime) + ": Сессия " + SessionID + " истекла.")
|
||
DeleteSession(SessionID)
|
||
|
||
|
||
###############################################################################
|
||
def DeleteSession(SessionID):
|
||
"""Процедура удаляет из базы конкретную сессию"""
|
||
|
||
CurTime = datetime.datetime.now()
|
||
if (SessionID == ""):
|
||
SaveLog(NameModule, str(CurTime) + ": Сессия для удаления не указана!")
|
||
return False
|
||
|
||
SaveLog(NameModule, str(CurTime) + ": Удаление сессии " + SessionID)
|
||
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}")
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions") == False):
|
||
os.mkdir(f"/dev/shm/{app.config['SHMNAME']}/sessions")
|
||
|
||
# создаем файл для сессии
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID) == True):
|
||
|
||
# Удаляем все файлы сессии
|
||
for f in os.listdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID):
|
||
os.remove(f"/dev/shm/{app.config['SHMNAME']}/sessions/{SessionID}/{f}")
|
||
|
||
# Удаляем папку сессии
|
||
os.rmdir(f"/dev/shm/{app.config['SHMNAME']}/sessions/" + SessionID)
|
||
|
||
|
||
###############################################################################
|
||
def GetVariable(Variable: str) -> str:
|
||
"""
|
||
Считываем настройку программы
|
||
:param Variable: имя переменной
|
||
"""
|
||
|
||
TempVar = ""
|
||
|
||
# Проверяем наличие сессии
|
||
CurPath = "db/Settings/"
|
||
|
||
# Если работает с Виртуальной памятью, то ищем настройки там
|
||
if (os.path.exists(f"/dev/shm/{app.config['SHMNAME']}/Settings") == True):
|
||
CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/"
|
||
|
||
if (os.path.exists(CurPath) == True):
|
||
if (os.path.exists(CurPath + "/" + Variable) == True):
|
||
file = open(CurPath + "/" + Variable, "r")
|
||
TempVar = file.read()
|
||
file.close()
|
||
return TempVar.strip()
|
||
|
||
return ""
|
||
|
||
|
||
###############################################################################
|
||
def SaveVariable(Variable, Value):
|
||
"""
|
||
Сохраняем переменную от настроек программы
|
||
:param Variable: Параметр
|
||
:param Value: Сохраняемое значение
|
||
"""
|
||
|
||
# Проверяем пусть
|
||
if (os.path.exists("db") == False):
|
||
return ""
|
||
if (os.path.exists("db/Settings") == False):
|
||
return ""
|
||
|
||
# Сохраняем настройки в программе
|
||
CurPath = "db/Settings/"
|
||
if (os.path.exists(CurPath) == True):
|
||
file = open(CurPath + "/" + Variable, "w")
|
||
file.write(str(Value))
|
||
file.close()
|
||
|
||
# Сохраняем настройки в памяти
|
||
CurPath = f"/dev/shm/{app.config['SHMNAME']}/Settings/"
|
||
if (os.path.exists(CurPath) == True):
|
||
file = open(CurPath + "/" + Variable, "w")
|
||
file.write(str(Value))
|
||
file.close()
|
||
|
||
return True
|
||
return False
|
||
|
||
|
||
###############################################################################
|
||
def GetSHA512_FromFile(Filename: str, secret: str) -> str:
|
||
"""
|
||
Получить хэш на основе секретного ключа
|
||
:param Filename: путь к файлу
|
||
:param secret:
|
||
"""
|
||
if (os.path.exists(Filename) == False):
|
||
return ""
|
||
|
||
# Расчет хэша
|
||
file = open(Filename, "rb")
|
||
CurStr = file.read()
|
||
file.close()
|
||
return GetSHA512_File(CurStr, secret)
|
||
|
||
|
||
###############################################################################
|
||
def GetSHA512_File(message: bytes, secret: str) -> str:
|
||
"""
|
||
Получить хэш из массива байтов на основе секретного ключа
|
||
:param message: bytes
|
||
:param secret
|
||
"""
|
||
|
||
h = hmac.new(bytearray(secret, "UTF-8"), message, hashlib.sha512)
|
||
return base64.b64encode(h.digest()).decode()
|
||
|
||
|
||
###############################################################################
|
||
def GetSHA512_Text(message: str, secret: str) -> str:
|
||
"""Получить хэш из текста на основе секретного ключа"""
|
||
h = hmac.new(bytearray(secret, "UTF-8"), bytearray(message, "UTF-8"), hashlib.sha512)
|
||
return base64.b64encode(h.digest()).decode()
|
||
|
||
|
||
###############################################################################
|
||
def GetSHA1_Text(message: str) -> str:
|
||
"""Получить хэш из текста на основе секретного ключа. Совместим с хэшем SQL 2005"""
|
||
hash_object = hashlib.sha1(bytearray(message, "UTF-8"))
|
||
hex_dig = hash_object.hexdigest()
|
||
return hex_dig
|
||
|
||
|
||
###############################################################################
|
||
def CheckPort(ip, port) -> str:
|
||
"""
|
||
Проверка порта на его открытие
|
||
"OPEN" или "CLOSE"
|
||
"""
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(1)
|
||
|
||
ip = ip.split("\\")[0]
|
||
Attempt = 0
|
||
LastStatus = ""
|
||
SaveLog(NameModule, "Проверка порта на: " + ip + ":" + str(port))
|
||
|
||
# Проверяем трижды или до 1 успеха
|
||
while (Attempt < 3):
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(1)
|
||
try:
|
||
sock.connect((ip, int(port)))
|
||
SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' открыт.')
|
||
# conn.close()
|
||
return "OPEN"
|
||
except Exception as E:
|
||
SaveLog(NameModule, "CheckPort: " + str(E))
|
||
# LastStatus = "ERROR"
|
||
|
||
time.sleep(1)
|
||
LastStatus = "CLOSE"
|
||
Attempt = Attempt + 1
|
||
SaveLog(NameModule, 'CheckPort: Порт ' + str(port) + ' закрыт.')
|
||
return LastStatus
|
||
|
||
|
||
###############################################################################
|
||
def RenameDataSession(SessionID, OldNameVar, NewNameVar) -> bool:
|
||
"""
|
||
Переименовываем переменную сессии
|
||
"""
|
||
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
|
||
return False
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
|
||
return False
|
||
|
||
# Проверяем наличие сессии
|
||
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
|
||
if (os.path.exists(CurPath) == True):
|
||
os.rename(CurPath + "/" + OldNameVar, CurPath + "/" + NewNameVar)
|
||
return True
|
||
return False
|
||
|
||
|
||
###############################################################################
|
||
def RemoveFromFolder(folder, RemoveSelf=False) -> str:
|
||
"""
|
||
Удалить всё из указанной папки
|
||
* RemoveSelf - Удалить саму папку (уже пустую)
|
||
return: Текст ошибки
|
||
"""
|
||
|
||
print('Обработка папки на удаление : ' + folder)
|
||
for filename in os.listdir(folder): # Получаем список файлов в папке
|
||
file_path = os.path.join(folder, filename) # Полный путь к файлу или папке
|
||
try:
|
||
if os.path.isfile(file_path): # Если это файл
|
||
print('Удаляем файл: ' + file_path)
|
||
os.remove(file_path) # Удалить
|
||
elif os.path.isdir(file_path): # Если это папка
|
||
RemoveFromFolder(file_path) # Удаляем содержимое этой папки (рекурсия)
|
||
print('Удаляем папку : ' + file_path)
|
||
os.rmdir(file_path) # А затем удалить саму эту папку
|
||
except Exception as e:
|
||
TextLog = f'Не удалось удалить {file_path}. Причина: {e}'
|
||
SaveLog(NameModule, TextLog)
|
||
return TextLog
|
||
if (RemoveSelf == True):
|
||
print('Удаляем папку : ' + folder)
|
||
os.rmdir(folder) # А затем удалить саму эту папку
|
||
print('Обработка папки на удаление завершена: ' + folder)
|
||
return ""
|
||
|
||
|
||
###############################################################################
|
||
def RemoveDataSession(SessionID, Variable):
|
||
"""
|
||
Удаляем переменную сессии
|
||
"""
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "") == False):
|
||
return False
|
||
if (os.path.exists("/dev/shm/" + app.config['SHMNAME'] + "/sessions") == False):
|
||
return False
|
||
|
||
# Проверяем наличие сессии
|
||
CurPath = "/dev/shm/" + app.config['SHMNAME'] + "/sessions/" + SessionID
|
||
if (os.path.exists(CurPath) == True):
|
||
if (os.path.isfile(CurPath + "/" + Variable)):
|
||
os.remove(CurPath + "/" + Variable)
|
||
return True
|
||
return False
|
||
|
||
|
||
########################################################################
|
||
def GetSHA1_FromFile(Filename):
|
||
"""Хэш для совместимости с MSSQL"""
|
||
if (os.path.exists(Filename) == False):
|
||
return ""
|
||
|
||
# Расчет хэша
|
||
file = open(Filename, "rb")
|
||
CurStr = file.read()
|
||
file.close()
|
||
|
||
hash_object = hashlib.sha1(CurStr)
|
||
hex_dig = hash_object.hexdigest()
|
||
|
||
return hex_dig
|
||
|
||
|
||
########################################################################
|
||
def ConvertStrToDate(date_text: str) -> datetime.datetime:
|
||
"""
|
||
Проверка что строка является датой.
|
||
* Поддерживается ГГГГММДД, ГГГГ-ММ-ДД, ДД.ММ.ГГГГ.
|
||
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ
|
||
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ:СС
|
||
* Поддерживается ГГГГ-ММ-ДД ЧЧ:ММ:СС.мс
|
||
* Поддерживается с буквой T вместо пробела.
|
||
|
||
|
||
"""
|
||
try:
|
||
|
||
date_text = date_text.strip()
|
||
|
||
# Если слишком мало текста для преобразования
|
||
# Дата не указана полностью
|
||
if (len(date_text) < 8 or len(date_text) == 9):
|
||
return None # type: ignore
|
||
|
||
# нет секунд. Вставляем их.
|
||
# print("date_text: " + date_text)
|
||
# print(f"len date_text: {len(date_text)}")
|
||
# print(f"""date_text.find("T": {date_text.find("T")}""")
|
||
|
||
if (len(date_text) == 16 and (date_text.find("T") == 10)):
|
||
date_text = date_text + ":00"
|
||
print("date_text: " + date_text)
|
||
|
||
# время не указано полностью
|
||
if (len(date_text) > 10 and len(date_text) < 19):
|
||
return None # type: ignore
|
||
|
||
# не 1-х и не 2-х тысячалетие
|
||
if (len(date_text) == 8 and date_text[:1] not in ('1', '2')):
|
||
return None # type: ignore
|
||
|
||
# ГГГГММДД
|
||
if (len(date_text) == 8):
|
||
return datetime.datetime.strptime(date_text, '%Y%m%d')
|
||
|
||
# Если точка идет 3-им символов (для формата ДД.ММ.ГГГГ )
|
||
if (date_text.find('.') == 2 or date_text.find('/') == 2 or date_text.find('-') == 2):
|
||
date_text = f"{date_text[6:10]}-{date_text[3:5]}-{date_text[0:2]}"
|
||
|
||
if (len(date_text) == 10):
|
||
return datetime.datetime.strptime(date_text.replace(".", "-"), '%Y-%m-%d')
|
||
|
||
# Если время разделено буквой T
|
||
if (date_text.find("T") > 0):
|
||
if (date_text.find(".") < 0): # Проверка что есть доли секунды
|
||
# Нет
|
||
return datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S')
|
||
else:
|
||
# Есть
|
||
return datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S.%f')
|
||
else:
|
||
# Проверка что есть доли секунды
|
||
if (date_text.find(".") < 0):
|
||
# Нет
|
||
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
# Есть
|
||
return datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S.%f')
|
||
except Exception:
|
||
if (date_text == "None"):
|
||
return None # type: ignore
|
||
else:
|
||
return date_text # type: ignore
|
||
|
||
|
||
###############################################################################
|
||
def ConvertDate(d: datetime.datetime, NewType=102) -> str:
|
||
"""
|
||
Преобразование даты в указанный формат:
|
||
* 102 - ГГГГ-ММ-ДД
|
||
* 104 - ДД.ММ.ГГГГ
|
||
* 107 - январь 2023
|
||
* 108 - Январь 2023
|
||
* 109 - ЯНВАРЬ 2023
|
||
* 112 - ГГГГММДД
|
||
* 122 - ГГГГ-ММ-ДД ЧЧ:ММ:СС
|
||
|
||
"""
|
||
M = {}
|
||
M.update({"01": "январь"})
|
||
M.update({"02": "февраль"})
|
||
M.update({"03": "март"})
|
||
M.update({"04": "апрель"})
|
||
M.update({"05": "май"})
|
||
M.update({"06": "июнь"})
|
||
M.update({"07": "июль"})
|
||
M.update({"08": "август"})
|
||
M.update({"09": "сентябрь"})
|
||
M.update({"10": "октябрь"})
|
||
M.update({"11": "ноябрь"})
|
||
M.update({"12": "декабрь"})
|
||
|
||
if NewType == 102:
|
||
return d.strftime("%Y-%m-%d")
|
||
if NewType == 104:
|
||
return d.strftime("%d.%m.%Y")
|
||
|
||
if NewType == 107:
|
||
return M[d.strftime("%m")] + " " + d.strftime("%Y")
|
||
if NewType == 108:
|
||
return M[d.strftime("%m")].capitalize() + " " + d.strftime("%Y")
|
||
if NewType == 109:
|
||
return M[d.strftime("%m")].upper() + " " + d.strftime("%Y")
|
||
|
||
if NewType == 112:
|
||
return d.strftime("%Y%m%d")
|
||
|
||
if NewType == 122:
|
||
return d.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
return ""
|
||
|
||
|
||
###############################################################################
|
||
def GetTZ():
|
||
"""Получить текущий часовой пояс"""
|
||
|
||
now = datetime.datetime.now()
|
||
Z = str(now.replace(tzinfo=datetime.timezone.utc) - now.astimezone(datetime.timezone.utc)).split(':')[0]
|
||
return int(Z)
|
||
|
||
|
||
###############################################################################
|
||
def GetStatusService(ServiceName) -> str:
|
||
# Проверка статуса
|
||
Result = subprocess.run("systemctl -all | grep " + ServiceName, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||
|
||
if (str(Result.returncode) == "1"):
|
||
return "None"
|
||
|
||
# Проверка статуса
|
||
p = subprocess.Popen(["systemctl", "is-active", ServiceName], stdout=subprocess.PIPE)
|
||
(output, err) = p.communicate()
|
||
output = output.decode('utf-8')
|
||
# print(output)
|
||
return output.strip()
|
||
|
||
|
||
###############################################################################
|
||
def CreateDBSettings():
|
||
"""Создание базы и таблицы настроек"""
|
||
|
||
# Проверяем и создаем путь для локальных сессий
|
||
if (os.path.exists("db") == False):
|
||
os.mkdir("db")
|
||
if (os.path.exists("db/Settings") == False):
|
||
os.mkdir("db/Settings")
|
||
|
||
|
||
###############################################################################
|
||
def GetFromRequestsT(Url: str, Attempt=1, headers={}) -> str:
|
||
"""Получить ответ по HTTP/S ссылке"""
|
||
|
||
if (headers == {}):
|
||
headers = {
|
||
'Accept': '*/*',
|
||
|
||
'Accept-Language': 'ru,en;q=0.9',
|
||
'Connection': 'keep-alive',
|
||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
|
||
}
|
||
|
||
# Сайты-исключения где не надо проверять сертификат.
|
||
CheckCert = True
|
||
# TODO: Узнать бы какой сертификат тут используется.
|
||
if (Url.find('opendata.digital.gov.ru') > 0):
|
||
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
|
||
|
||
# При работе с локальными адресами используем свой сертификат
|
||
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
|
||
CheckCert = "app/static/CA"
|
||
|
||
# Отправляем GET-запрос на URL
|
||
response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert)
|
||
|
||
# Получаем статус-код ответа
|
||
status_code = response.status_code
|
||
|
||
if (str(status_code)[0] == "5"):
|
||
# Если получен код ошибки 502, повторяем запрос до 3 раз
|
||
if (Attempt <= 3):
|
||
return GetFromRequestsT(Url, Attempt + 1)
|
||
else:
|
||
return ""
|
||
|
||
# Возвращаем тело ответа в виде строки
|
||
return response.text
|
||
|
||
|
||
###############################################################################
|
||
def GetFromRequestsB(Url: str, Attempt=1, headers={}) -> bytes:
|
||
"""
|
||
Получить ответ по HTTP/S ссылке.
|
||
Ответ в байтах. Например, скачаный двоичный файл.
|
||
"""
|
||
|
||
if (headers == {}):
|
||
headers = {
|
||
'Accept': '*/*',
|
||
|
||
'Accept-Language': 'ru,en;q=0.9',
|
||
'Connection': 'keep-alive',
|
||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
|
||
}
|
||
|
||
# Сайты-исключения где не надо проверять сертификат.
|
||
CheckCert = True
|
||
# TODO: Узнать бы какой сертификат тут используется.
|
||
if (Url.find('opendata.digital.gov.ru') > 0):
|
||
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
|
||
|
||
# При работе с локальными адресами используем свой сертификат
|
||
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
|
||
CheckCert = "app/static/CA"
|
||
|
||
# Отправляем GET-запрос на URL
|
||
response = requests.get(Url, headers=headers, timeout=600, verify=CheckCert)
|
||
|
||
# Получаем статус-код ответа
|
||
status_code = response.status_code
|
||
|
||
if (str(status_code)[0] == "5"):
|
||
# Если получен код ошибки 502, повторяем запрос до 3 раз
|
||
if (Attempt <= 3):
|
||
return GetFromRequestsB(Url, Attempt + 1)
|
||
else:
|
||
return None # type: ignore
|
||
|
||
# Или в виде байтов
|
||
return response.content
|
||
|
||
|
||
###############################################################################
|
||
def PostFromRequestsB(Url: str, Data, Attempt=1, headers=None) -> bytes:
|
||
"""
|
||
Получить ответ по HTTP/S ссылке.
|
||
Ответ в байтах. Например, скачаный двоичный файл.
|
||
"""
|
||
|
||
if (headers is None):
|
||
headers = {
|
||
'Accept': '*/*',
|
||
|
||
'Accept-Language': 'ru,en;q=0.9',
|
||
'Connection': 'keep-alive',
|
||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
|
||
}
|
||
|
||
# Сайты-исключения где не надо проверять сертификат.
|
||
CheckCert = True
|
||
# TODO: Узнать бы какой сертификат тут используется.
|
||
if (Url.find('opendata.digital.gov.ru') > 0):
|
||
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
|
||
|
||
# При работе с локальными адресами используем свой сертификат
|
||
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
|
||
CheckCert = "app/static/CA"
|
||
|
||
# Отправляем POST-запрос на URL
|
||
response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=Data)
|
||
|
||
# Получаем статус-код ответа
|
||
status_code = response.status_code
|
||
|
||
if (str(status_code)[0] == "5"):
|
||
# Если получен код ошибки 502, повторяем запрос до 3 раз
|
||
if (Attempt <= 3):
|
||
return PostFromRequestsB(Url, Data, Attempt + 1)
|
||
else:
|
||
return None
|
||
|
||
# Или в виде байтов
|
||
return response.content
|
||
|
||
|
||
###############################################################################
|
||
def PostFromRequestsT(Url: str, data, Attempt=1, headers=None) -> dict:
|
||
"""
|
||
Получить ответ по HTTP/S ссылке.
|
||
Ответ в виде текста.
|
||
"""
|
||
Answer = {}
|
||
if (headers is None):
|
||
headers = {
|
||
'Accept': '*/*',
|
||
|
||
'Accept-Language': 'ru,en;q=0.9',
|
||
'Connection': 'keep-alive',
|
||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.142 Safari/537.36',
|
||
}
|
||
|
||
# Сайты-исключения где не надо проверять сертификат.
|
||
CheckCert = True
|
||
|
||
# TODO: Узнать бы какой сертификат тут используется.
|
||
if (Url.find('opendata.digital.gov.ru') > 0):
|
||
CheckCert = False # Игнорировать или нет проверку сертификата для этого сайта
|
||
|
||
# При работе с локальными адресами используем свой сертификат
|
||
if (Url.find('192.168.') > 0 or Url.find('10.') > 0 or Url.find('127.0.0.1') > 0 or Url.find('rosminzdrav.ru') > 0):
|
||
CheckCert = "app/static/CA"
|
||
|
||
# Отправляем POST-запрос на URL
|
||
response = requests.post(Url, headers=headers, timeout=600, verify=CheckCert, data=data)
|
||
|
||
# Получаем статус-код ответа
|
||
status_code = response.status_code
|
||
Answer.update({"code": str(status_code)})
|
||
Answer.update({"text": response.text})
|
||
|
||
if (str(status_code)[0] == "5"):
|
||
# Если получен код ошибки 502, повторяем запрос до 3 раз
|
||
if (Attempt <= 3):
|
||
return PostFromRequestsT(Url, data, Attempt + 1)
|
||
else:
|
||
Answer.update({"status": "error"})
|
||
return Answer
|
||
|
||
if (str(status_code)[0] == "2"):
|
||
Answer.update({"status": "ok"})
|
||
else:
|
||
Answer.update({"status": "error"})
|
||
|
||
# Или в виде текста
|
||
return Answer
|
||
|
||
|
||
###############################################################################
|
||
def DayWeek(CurDate: datetime.datetime = datetime.datetime.today()):
|
||
"Получить день недели (по умолчанию - сегодня)"
|
||
weekday = CurDate.weekday()
|
||
return weekday + 1 # Что бы было от 1 до 7
|
||
|
||
|
||
###############################################################################
|
||
def SaveFile(Filename, Content, IsBinary=False) -> str:
|
||
"""
|
||
Сохранить файл с заменой существующего файла.
|
||
* Filename - полный путь к файлу.
|
||
* Content - содержимое файла.
|
||
* IsBinary - является ли содержимое двоичным или нет.
|
||
return - пустота если всё хорошо или текст ошибки.
|
||
"""
|
||
Mode = 'w' # Текстовый
|
||
if (IsBinary == True):
|
||
Mode = 'wb' # Бинарный
|
||
|
||
try:
|
||
with open(Filename, Mode) as fp:
|
||
fp.write(Content)
|
||
fp.close()
|
||
return '' # Всё хорошо
|
||
except Exception as E:
|
||
return str(E)
|
||
|
||
|
||
###############################################################################
|
||
def NewGUID() -> str:
|
||
"Сгененрировать новый GUID"
|
||
return str(uuid.uuid4())
|