Подписка на уведомления о значениях тега по протоколу WebSocket
При использовании Kaspersky Industrial CyberSecurity for Networks API стороннее приложение может создавать подписку на уведомления об изменении значений определенного тега. Для создания подписки и получения уведомлений используется протокол WebSocket.
Сценарий подписки для стороннего приложения состоит из следующих этапов:
- Стороннее приложение устанавливает соединение с Сервером Kaspersky Industrial CyberSecurity for Networks через коннектор для этого приложения с использованием сервера REST API.
После успешного соединения с Сервером коннектору отправляется токен аутентификации. Коннектор использует токен аутентификации для всех последующих взаимодействий с Сервером в этом сеансе (в частности для запроса своей конфигурации с Сервера).
- Стороннее приложение подключается с использованием WebSocket и отправляет запрос для создания подписки на уведомления о получении значений нужного тега.
Сервер Kaspersky Industrial CyberSecurity for Networks принимает запрос и создает подписку. Для отправки запроса используются соответствующие функции, предусмотренные протоколом WebSocket.
- Kaspersky Industrial CyberSecurity for Networks обнаруживает в трафике новое значение при чтении или записи тега.
- Kaspersky Industrial CyberSecurity for Networks отправляет полученное значение тега стороннему приложению, для которого действует подписка на уведомления о получении значений этого тега.
Основные особенности реализации подписки:
- После того, как стороннее приложение указывает нужные теги Kaspersky Industrial CyberSecurity for Networks, Сервер отправляет подтверждение о возможности получать значения этих тегов. Далее стороннее приложение ожидает поступление значений этих тегов по установленному соединению.
- Для создания и сопровождения подписки используется протокол WebSocket и соединение по тому же адресу, который используется сервером REST API.
- Сервер Kaspersky Industrial CyberSecurity for Networks поддерживает не более одной активной подписки на значения тегов. Если активная подписка уже создана и используется, при попытке создания еще одной подписки возвращается ошибка о слишком большом количестве подключений.
- Стороннее приложение имеет возможность в любой момент закрыть установленное соединение по подписке для прекращения получения значений тега.
- Подписка прекращается либо после ее закрытия сторонним приложением, либо при разрыве соединения. Если Сервер Kaspersky Industrial CyberSecurity for Networks был временно недоступен (разорвано соединение) и отправка значений по подписке не выполнялась, то после восстановления соединения стороннему приложению потребуется заново подписаться на получение значений тегов.
Подключение с использованием WebSocket
Для получения тегов по подписке могут использоваться как стандартные функции WebSocket, так и библиотека SignalR Core. Пакеты для работы с библиотекой SignalR Core доступны для наиболее распространенных языков программирования: С++, С#, Java, Python, Go, JavaScript/TypeScript.
Для подключения с использованием WebSocket вам нужно указывать следующий адрес:<адрес publicApi из файла свертки>/kics4net/api/ /v3/tag-values
При этом указываемый протокол в строке адреса зависит от используемой функциональности для подключения.
Если используется библиотека SignalR Core, строка адреса начинается с https://
. Например:https://kics-server:8080/kics4net/api/ /v3/tag-values
Если используются стандартные функции WebSocket, в строке адреса нужно заменить https
на wss
. Например: wss://kics-server:8080/kics4net/api/v3/tag-values
Если при подключении не предоставлен токен аутентификации (или предоставленный токен не прошел проверку), в ответ на открытие подключения сервер возвращает код 401.
Создание подписки на значения тегов
Для создания подписки требуется выполнить запрос с именем метода GetTagValuesStream
.
Пример аргумента запроса: { "tagIdentifiers": [ { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" }, { "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" } ], "streamConfig": { "samplingRateHz": 1 } } |
Аргумент запроса состоит из следующих полей:
tagIdentifiers
– массив идентификаторов тегов, значения которых нужно получать по подписке.assetName
,tagName
– значения, представляющие имя устройства и имя тега (используются для идентификации тега, на значения которого нужно создать подписку).samplingRateHz
– частота сэмплирования значений тегов (используется для уменьшения объема передаваемых данных). Если для поля задано нулевое значение, сэмплирование не выполняется.
Если аргумент создания подписки не удовлетворяет требованиям к полям, возвращается ошибка с описанием проблемы.
Пример ошибки для аргумента создания подписки:
|
Подтверждение подписки
При подтверждении подписки сервер возвращает по одному результату подтверждения для каждого тега, подходящего под значения tagIdentifiers
в запросе.
Пример подтверждения подписки: { "confirmation": { "result": "ok", "tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" }, "tagId": 102 } } |
Ответ с подтверждением подписки состоит из следующих полей:
result
– статус подписки на значение тега. Может принимать значения:ok
– подписка успешно создана;notFound
– тег с указаннымиassetName
,tagName
не найден.
tagIdentifier
– идентификатор тега, аналогичный одному значению из массиваtagIdentifiers
аргументов запроса на создание подписки.tagId
– уникальный идентификатор тега в программе. Может быть использован для получения информации о теге через Kaspersky Industrial CyberSecurity for Networks API, а также для идентификации тега в ответе с его значениями.
Значения тегов по подписке
Программа отправляет значения тегов по подписке в структуре полей. На верхнем уровне в структуре представлены следующие поля:
{
"value": {
"tagId": <
уникальный идентификатор тега в программе
>,
"tagValue": "<
JSON-объект с данными тега
>"
}
}
Информация о новом значении тега отправляется в стороннее приложение в формате JSON. Отправляемый объект с данными содержит следующие поля:
n
– тип данных тега, представленный именем изTagStructure
.ts
– время регистрации последнего обновления значений тега. Указывается в микросекундах от 01.01.1970.dn
– направление передачи. Может принимать значения:r
,w
,rw
.mp
– идентификатор точки мониторинга.d
– содержимое полей тега.
Атрибут d
представляет словарь, в котором каждый ключ является именем поля тега нулевой иерархии. Значение каждого поля имеет следующие атрибуты:
t
– обязательный атрибут, указывающий один из следующих типов данных:u
– UINT64.i
– INT64.b
– BOOL.d
– DOUBLE.s
– строка UTF8.t
– время в микросекундах от 01.01.1970.e
– ENUM. Дополнительно поле содержит следующие атрибуты:n
– имя типа ENUM;- v – исходное значение ENUM;
- s – строковое значение ENUM.
st
– структура.un
– UNION.
v
– обязательный атрибут, указывающий значение поля тега.n
– имя типа ENUM изTagStructure
(только для типаe
– ENUM).s
– строковое значение ENUM (только для типаe
– ENUM).Пример:
- enum:
name: OpType #
Имя типа ENUM (атрибут 'n')
data:
0: NUL #
0 запишется в атрибут 'v', NUL запишется в 's'
1: PULSE_ON
2: PULSE_OFF
x
– идентифицирует основное значение тега.Формат:
"x": 1
Для всех остальных полей тега атрибут
x
отсутствует.- m – специальный маркер параметра тега. Соответствует атрибуту marker со следующими полями:
q
– значение атрибута качества.ts
– статус метки времени, отображающий её правильность, временное состояние или причину ошибки при проверке.ds
– статус данных.o
– источник, от которого пришло значение или команда.t
– время последнего обновления значений тега, взятое из трафика.ct
– причина передачи.
Формат:
"m": "q"
Пример отправляемого значения тега в формате JSON:
{
"n": "TagStructure1",
"ts": 18446744073709551616,
"dn": "r",
"mp": 1,
"d":
{
"value":
{
"t": "d",
"v": 3.1415,
"x": 1
},
"quality":
{
"t": "s",
"v": "good",
"m": "q"
},
"mask":
{
"t": "u",
"v": 18446744073709551616
},
"enumfield":
{
"t": "e",
"n": "SwitchState",
"v": 0,
"s": "Off"
},
"strucfield":
{
"t": "st",
"v":
{
"v1":
{
"t": "d",
"v": 3.1415
},
"q2":
{
"t": "s",
"v": "good",
"m": "q"
}
}
},
"unionfield":
{
"t": "un",
"v":
{
"_":
{
"t": "u",
"v": 42
},
"low4bits":
{
"t": "u",
"v": 10
},
"high4bits":
{
"t": "u",
"v": 2
}
}
}
}
}
Примеры получения значений тегов по подписке
Ниже представлен пример получения значений тегов по подписке с использованием стандартных функций WebSocket на языке Python.
Предварительно требуется выполнить команду:pip install websocket_client
Пример подписки с использованием стандартных функций WebSocket: import json, ssl, websocket
def on_message(ws, message): print(message)
def on_error(ws, error): print(f' error: {error}')
def on_close(ws): print("### closed ###")
def on_open(ws): print("connection opened and handshake received ready to send messages")
# all sent messages must end with this character message_separator = chr(30)
# setting up json as messages format protocol_selection_args = { 'protocol': 'json', 'version': 1 } ws.send(json.dumps(protocol_selection_args) + message_separator)
# creating subscription args = { 'arguments': [ { 'tagIdentifiers': [ { 'tagName': 'tag_01', 'assetName': 'asset_02' } ], 'streamConfig': { 'samplingRateHz': 5 } } ], 'invocationId': '0', # will be included in response message 'target': 'getTagValuesStream', 'type': 4 # must be equal to 4 for outgoing messages }
ws.send(json.dumps(args) + message_separator)
def login(): token = "you should get access token for API here" return token
if __name__ == "__main__": server_url = "wss://localhost:8091/kics4net/api/tag-values" auth = "Authorization: Bearer " + login()
# for troubleshooting uncomment next line # websocket.enableTrace(True) ws = websocket.WebSocketApp(server_url, on_message=on_message, on_error=on_error, on_close=on_close, header=[auth])
print(f'opening connection to {server_url}') ws.on_open = on_open ws.run_forever( # use it only if Server has self-signed certificate sslopt={"cert_reqs": ssl.CERT_NONE} ) |
Ниже представлен пример получения значений тегов по подписке с использованием библиотеки SignalR Core на языке Python.
Предварительно требуется выполнить команду:pip install signalrcore
Пример подписки с использованием библиотеки SignalR Core: import logging from signalrcore.hub_connection_builder import HubConnectionBuilder
TOKEN = 'you should get access token for API here’ IP = '192.168.0.7' PORT = '8080' HUB = 'kics4net/api/v3/tag-values'
class WebsocketConnection(HubConnectionBuilder): def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False): super().__init__() self.with_url(url, options=options) self.configure_logging(logging.WARNING) self.with_automatic_reconnect({ "type": "raw", "keep_alive_interval": 10, "reconnect_interval": 5, "max_attempts": 5 }) self.verify_ssl = verify_ssl
def on_tag_stream_value(self, m): result.append(m) print(f'on_new_tag_value, {m}')
def on_tag_strean_error(self, e): print(f'onError, {e}')
def on_tag_stream_complete(self, q): print(f'onComplete, {q}')
def subscribe_tags(self): print("connection opened and handshake received ready to send messages")
args = { 'tagIdentifiers': [ { 'tagName': 'tag_01', 'assetName': 'asset_02'} ], 'streamConfig': { 'samplingRateHz': 5 } }
self.stream("GetTagValuesStream", [args]) \ .subscribe({ "next": self.on_tag_stream_value, "complete": self.on_tag_stream_complete, "error": self.on_tag_strean_error })
def main(): server_url = "https://{}:{}/{}".format(IP, PORT, HUB) login = 'bearer {}'.format(TOKEN)
conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}}) conn.build()
logging.info(f'opening connection to {server_url}') conn.on_open(conn.subscribe_tags) conn.start()
logging.info('closing connection') conn.stop()
if __name__ == '__main__': main() |