Подписка на уведомления о значениях тега по протоколу WebSocket

22 марта 2024

ID 212101

При использовании Kaspersky Industrial CyberSecurity for Networks API стороннее приложение может создавать подписку на уведомления об изменении значений определенного тега. Для создания подписки и получения уведомлений используется протокол WebSocket.

Сценарий подписки для стороннего приложения состоит из следующих этапов:

  1. Стороннее приложение устанавливает соединение с Сервером Kaspersky Industrial CyberSecurity for Networks через коннектор для этого приложения с использованием сервера REST API.

    После успешного соединения с Сервером коннектору отправляется токен аутентификации. Коннектор использует токен аутентификации для всех последующих взаимодействий с Сервером в этом сеансе (в частности для запроса своей конфигурации с Сервера).

  2. Стороннее приложение подключается с использованием WebSocket и отправляет запрос для создания подписки на уведомления о получении значений нужного тега.

    Сервер Kaspersky Industrial CyberSecurity for Networks принимает запрос и создает подписку. Для отправки запроса используются соответствующие функции, предусмотренные протоколом WebSocket.

  3. Kaspersky Industrial CyberSecurity for Networks обнаруживает в трафике новое значение при чтении или записи тега.
  4. Kaspersky Industrial CyberSecurity for Networks отправляет полученное значение тега стороннему приложению, для которого действует подписка на уведомления о получении значений этого тега.

Основные особенности реализации подписки:

  • После того, как стороннее приложение указывает нужные теги Kaspersky Industrial CyberSecurity for Networks, Сервер отправляет подтверждение о возможности получать значения этих тегов. Далее стороннее приложение ожидает поступление значений этих тегов по установленному соединению.
  • Для создания и сопровождения подписки используется протокол WebSocket и соединение по тому же адресу, который используется сервером REST API.
  • Сервер Kaspersky Industrial CyberSecurity for Networks поддерживает не более одной активной подписки на значения тегов. Если активная подписка уже создана и используется, при попытке создания еще одной подписки возвращается ошибка о слишком большом количестве подключений.
  • Стороннее приложение имеет возможность в любой момент закрыть установленное соединение по подписке для прекращения получения значений тега.
  • Подписка прекращается либо после ее закрытия сторонним приложением, либо при разрыве соединения. Если Сервер Kaspersky Industrial CyberSecurity for Networks был временно недоступен (разорвано соединение) и отправка значений по подписке не выполнялась, то после восстановления соединения стороннему приложению потребуется заново подписаться на получение значений тегов.

Подключение с использованием WebSocket

Для получения тегов по подписке могут использоваться как стандартные функции WebSocket, так и библиотека SignalR Core. Пакеты для работы с библиотекой SignalR Core доступны для наиболее распространенных языков программирования: С++, С#, Java, Python, Go, JavaScript/TypeScript.

Для подключения с использованием WebSocket вам нужно указывать следующий адрес:
<адрес publicApi из файла свертки>/kics4net/api/v4/tag-values

При этом указываемый протокол в строке адреса зависит от используемой функциональности для подключения.

Если используется библиотека SignalR Core, строка адреса начинается с https://. Например:
https://kics-server:8080/kics4net/api/v4/tag-values

Если используются стандартные функции WebSocket, в строке адреса нужно заменить https на wss. Например:
wss://kics-server:8080/kics4net/api/v4/tag-values

Если при подключении не предоставлен токен аутентификации (или предоставленный токен не прошел проверку), в ответ на открытие подключения сервер возвращает код 401.

Создание подписки на значения тегов

Для создания подписки требуется выполнить запрос с именем метода GetTagValuesStream.

Пример аргумента запроса:

{

"tagIdentifiers": [

{ "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

{ "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" }

],

"streamConfig": {

"samplingRateHz": 1

}

}

Аргумент запроса состоит из следующих полей:

  • tagIdentifiers – массив идентификаторов тегов, значения которых нужно получать по подписке.
  • assetName, tagName – значения, представляющие имя устройства и имя тега (используются для идентификации тега, на значения которого нужно создать подписку).
  • samplingRateHz – частота сэмплирования значений тегов (используется для уменьшения объема передаваемых данных). Если для поля задано нулевое значение, сэмплирование не выполняется.

Если аргумент создания подписки не удовлетворяет требованиям к полям, возвращается ошибка с описанием проблемы.

Пример ошибки для аргумента создания подписки:

HubException: GetTagValuesStreamRequest has validation errors:

TagIdentifiers:

The TagName field is required.

The StreamConfig field is required.

Подтверждение подписки

При подтверждении подписки сервер возвращает по одному результату подтверждения для каждого тега, подходящего под значения tagIdentifiers в запросе.

Пример подтверждения подписки:

{

"confirmation": {

"result": "ok",

"tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

"tagId": 102

}

}

Ответ с подтверждением подписки состоит из следующих полей:

  • result – статус подписки на значение тега. Может принимать значения:
    • ok – подписка успешно создана;
    • notFound – тег с указанными assetName, tagName не найден.
  • tagIdentifier – идентификатор тега, аналогичный одному значению из массива tagIdentifiers аргументов запроса на создание подписки.
  • tagId – уникальный идентификатор тега в программе. Может быть использован для получения информации о теге через Kaspersky Industrial CyberSecurity for Networks API, а также для идентификации тега в ответе с его значениями.

Значения тегов по подписке

Программа отправляет значения тегов по подписке в структуре полей. На верхнем уровне в структуре представлены следующие поля:

{

    "value": {

        "tagId": <уникальный идентификатор тега в программе>,

        "tagValue": "<JSON-объект с данными тега>"

    }

}

Информация о новом значении тега отправляется в стороннее приложение в формате JSON. Отправляемый объект с данными содержит следующие поля:

  • n – тип данных тега, представленный именем из TagStructure.
  • ts – время регистрации последнего обновления значений тега. Указывается в микросекундах от 01.01.1970.
  • dn – направление передачи. Может принимать значения: r, w, rw.
  • mp – идентификатор точки мониторинга.
  • d – содержимое полей тега.

Атрибут d представляет словарь, в котором каждый ключ является именем поля тега нулевой иерархии. Значение каждого поля имеет следующие атрибуты:

  • t – обязательный атрибут, указывающий один из следующих типов данных:
    • u – UINT64.
    • i – INT64.
    • b – BOOL.
    • d – DOUBLE.
    • s – строка UTF8.
    • t – время в микросекундах от 01.01.1970.
    • e – ENUM. Дополнительно поле содержит следующие атрибуты:
      • n – имя типа ENUM;
      • v – исходное значение ENUM;
      • s – строковое значение ENUM.
    • st – структура.
    • un – UNION.
  • v – обязательный атрибут, указывающий значение поля тега.
  • n – имя типа ENUM из TagStructure (только для типа e – ENUM).
  • s – строковое значение ENUM (только для типа e – ENUM).

    Пример:

    - enum:

        name: OpType # Имя типа ENUM (атрибут 'n')

        data:

            0: NUL # 0 запишется в атрибут 'v', NUL запишется в 's'

            1: PULSE_ON

            2: PULSE_OFF

  • x – идентифицирует основное значение тега.

    Формат: "x": 1

    Для всех остальных полей тега атрибут x отсутствует.

  • m – специальный маркер параметра тега. Соответствует атрибуту marker со следующими полями:
    • q – значение атрибута качества.
    • ts – статус метки времени, отображающий её правильность, временное состояние или причину ошибки при проверке.
    • ds – статус данных.
    • o – источник, от которого пришло значение или команда.
    • t – время последнего обновления значений тега, взятое из трафика.
    • ct – причина передачи.

    Формат: "m": "q"

    Пример отправляемого значения тега в формате JSON:

    {

        "n": "TagStructure1",

        "ts": 18446744073709551616,

        "dn": "r",

        "mp": 1,

        "d":

        {

            "value":

            {

                "t": "d",

                "v": 3.1415,

                "x": 1

            },

            "quality":

            {

                "t": "s",

                "v": "good",

                "m": "q"

            },

            "mask":

            {

                "t": "u",

                "v": 18446744073709551616

            },

            "enumfield":

            {

                "t": "e",

                "n": "SwitchState",

                "v": 0,

                "s": "Off"

            },

            "strucfield":

            {

                "t": "st",

                "v":

                {

                    "v1":

                    {

                        "t": "d",

                        "v": 3.1415

                    },

                    "q2":

                    {

                        "t": "s",

                        "v": "good",

                        "m": "q"

                    }

                }

            },

            "unionfield":

            {

                "t": "un",

                "v":

                {

                    "_":

                    {

                        "t": "u",

                        "v": 42

                    },

                    "low4bits":

                    {

                        "t": "u",

                        "v": 10

                    },

                    "high4bits":

                    {

                        "t": "u",

                        "v": 2

                    }

                }

            }

        }

    }

Примеры получения значений тегов по подписке

Ниже представлен пример получения значений тегов по подписке с использованием стандартных функций WebSocket на языке Python.

Предварительно требуется выполнить команду:
pip install websocket_client

Пример подписки с использованием стандартных функций WebSocket:

import json, ssl, websocket

def on_message(ws, message):

print(message)

def on_error(ws, error):

print(f' error: {error}')

def on_close(ws):

print("### closed ###")

def on_open(ws):

print("connection opened and handshake received ready to send messages")

# all sent messages must end with this character

message_separator = chr(30)

# setting up json as messages format

protocol_selection_args = {

'protocol': 'json',

'version': 1

}

ws.send(json.dumps(protocol_selection_args) + message_separator)

# creating subscription

args = {

'arguments': [

{

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'

}

],

'streamConfig': {

'samplingRateHz': 5

}

}

],

'invocationId': '0', # will be included in response message

'target': 'getTagValuesStream',

'type': 4 # must be equal to 4 for outgoing messages

}

ws.send(json.dumps(args) + message_separator)

def login():

token = "you should get access token for API here"

return token

if __name__ == "__main__":

server_url = "wss://localhost:8091/kics4net/api/tag-values"

auth = "Authorization: Bearer " + login()

# for troubleshooting uncomment next line

# websocket.enableTrace(True)

ws = websocket.WebSocketApp(server_url,

on_message=on_message,

on_error=on_error,

on_close=on_close,

header=[auth])

print(f'opening connection to {server_url}')

ws.on_open = on_open

ws.run_forever(

# use it only if Server has self-signed certificate

sslopt={"cert_reqs": ssl.CERT_NONE}

)

Ниже представлен пример получения значений тегов по подписке с использованием библиотеки SignalR Core на языке Python.

Предварительно требуется выполнить команду:
pip install signalrcore

Пример подписки с использованием библиотеки SignalR Core:

import logging

from signalrcore.hub_connection_builder import HubConnectionBuilder

TOKEN = 'you should get access token for API here’

IP = '192.168.0.7'

PORT = '8080'

HUB = 'kics4net/api/v4/tag-values'

class WebsocketConnection(HubConnectionBuilder):

def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False):

super().__init__()

self.with_url(url, options=options)

self.configure_logging(logging.WARNING)

self.with_automatic_reconnect({

"type": "raw",

"keep_alive_interval": 10,

"reconnect_interval": 5,

"max_attempts": 5

})

self.verify_ssl = verify_ssl

def on_tag_stream_value(self, m):

result.append(m)

print(f'on_new_tag_value, {m}')

def on_tag_strean_error(self, e):

print(f'onError, {e}')

def on_tag_stream_complete(self, q):

print(f'onComplete, {q}')

def subscribe_tags(self):

print("connection opened and handshake received ready to send messages")

args = {

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'}

],

'streamConfig': {

'samplingRateHz': 5

}

}

self.stream("GetTagValuesStream", [args]) \

.subscribe({

"next": self.on_tag_stream_value,

"complete": self.on_tag_stream_complete,

"error": self.on_tag_strean_error

})

def main():

server_url = "https://{}:{}/{}".format(IP, PORT, HUB)

login = 'bearer {}'.format(TOKEN)

conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}})

conn.build()

logging.info(f'opening connection to {server_url}')

conn.on_open(conn.subscribe_tags)

conn.start()

logging.info('closing connection')

conn.stop()

if __name__ == '__main__':

main()

Вам помогла эта статья?
Что нам нужно улучшить?
Спасибо за ваш отзыв, вы помогаете нам становиться лучше!
Спасибо за ваш отзыв, вы помогаете нам становиться лучше!