06 июля 2020 Время чтения: 7 минут

Пишем mock-сервер для коммутаторов Juniper

Дмитрий Витвицкий

Дмитрий Витвицкий

Программист

ISPSystem
как написать mock-сервер для коммутатора Juniper. Обложка к статье

Меня зовут Дмитрий, и я разработчик DCImanager — панели для управления оборудованием от ISPsystem. Довольно продолжительное время в команде я провёл, разрабатывая софт для управления коммутаторами. Вместе мы пережили взлеты и падения: от написания сервисов для управления железом до падения офисной сети и часовых свиданий в серверной в надежде не потерять своих любимых.

И вот настало время тестирования. Часть обработчиков мы смогли покрыть готовыми решениями для тестирования. Но с Juniper так не получилось. Ресерч и реализация послужили идеей для написания этой статьи.

DCImanager работает с разными видами оборудования: коммутаторы, распределители питания, серверы. Сейчас DCImanager поддерживает четыре обработчика коммутаторов. Два по протоколу SNMP (Cisco Catalyst и общий snmp common) и еще два по протоколу NETCONF (Juniper с поддержкой ELS и без).

Всю работу с оборудованием мы обильно покрываем тестами. Использовать для автоматического тестирования реальное оборудование не получается: тесты запускаются на каждый пуш и проходят параллельно. Поэтому мы стараемся использовать эмуляторы.

Обработчики с поддержкой протокола SNMP мы смогли покрыть тестами, используя библиотеку SNMP Agent Simulator. А вот с Juniper’ом возникли проблемы. Поискав готовые решения, выбрали пару библиотек, но одна из них не завелась, а другая делала не то, что нужно — я потратил больше времени на попытки оживить это чудо.

Встал вопрос, а как же эмулировать работу коммутаторов Juniper? Juniper работает по протоколу NETCONF, который, в свою очередь, работает поверх SSH. В голове промелькнула мысль написать небольшой сервис, который будет работать поверх SSH и эмулировать работу коммутатора. Соответственно, нам нужен сам сервис, а также «снимок» Juniper для эмуляции данных.

В snmpsim под снимком понимается полная копия состояния коммутатора, со всеми его поддерживаемыми OID и их текущими значениями.

В Juniper всё немного сложнее: такой снимок сделать не получится. Здесь под снимком будем понимать набор шаблонов типа: запрос-ответ.

Часть первая: архитектура посадки

Сейчас мы активно пополняем «зоопарк» обработчиков для работы с коммутаторами. Скоро у нас появятся новые обработчики, и не все из них мы сможем покрыть готовыми решениями для тестирования. Однако можно попробовать написать общую архитектуру сервиса, который будет имитировать работу различных устройств по разным протоколам.

В самом простом варианте — фабрика, которая в зависимости от протокола и обработчика (некоторые коммутаторы могут работать по нескольким протоколам), будет возвращать объект коммутатора, в котором уже будет реализована вся логика его поведения. В случае с Juniper, это небольшой синтаксический анализатор запроса. В зависимости от входного rpc-запроса с параметрами, он будет выполнять необходимые действия.

Важное ограничение: мы не сможем полностью имитировать работу коммутатора. На описание всей логики уйдёт много времени, а добавив новую функциональность в реальный обработчик, нам придется править и mock коммутатора.

Часть вторая: подбираем почву для посадки

Взгляд пал на библиотеку paramiko, которая предоставляет удобный интерфейс работы по протоколу SSH. Для начала хотелось не разносить архитектуру, а проверить базовые вещи, например, коннект и какой-нибудь простой запрос. Мы же всё-таки ресерчем занимаемся. Поэтому над авторизацией не заморачиваемся: простой ServerInterface и socket-сервер в связке дают нам что-то похожее на работающий вариант:


class SshServer(paramiko.ServerInterface):
   def check_auth_password(self, user, password):
       if user == SSH_USER_NAME and password == SSH_USER_PASSWORD:
           return paramiko.AUTH_SUCCESSFUL
       return paramiko.AUTH_FAILED

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket.bind(("127.0.0.1", 8300))
socket.listen(10)

client, address = socket.accept()
session = paramiko.Transport(client)

server = SshServer()
session.start_server(server=server)

Примерная реализация того, что хотелось бы видеть, но выглядит страшно

При подключении клиента к серверу, второй должен ответить списком своих capabilities (возможностей). Например таким:


reply = """
    
     
      urn:ietf:params:xml:ns:netconf:base:1.0
      xml.juniper.net/netconf/junos/1.0
      xml.juniper.net/dmi/system/1.0
     
     1
    
    ]]>]]>
"""
socket.send(reply)

Да, это XML ]]>]]>

Если что, код работает нестабильно. В данной реализации есть проблема с закрытием сокета. Нашел пару зарегистрированных issues в paramiko с этой проблемой. Отложил ненадолго, решив проверить оставшийся вариант.

Часть третья: посадка

Козырь в рукаве — Twisted. Это фреймворк разработки сетевых приложений с поддержкой большого количества протоколов. У него есть обширная документация и замечательный модуль Cred, который нам и поможет.

Cred — это механизм аутентификации, позволяющий различным сетевым протоколам подключаться к системе в зависимости от ваших требований. Для организации всей логики используется Realm — часть приложения, отвечающая за бизнес-логику и доступ к ее объектам. Но обо всем по порядку.

Ядром входа в систему является Portal. Если мы хотим написать надстройку над сетевым протоколом, определяем стандартный Portal. В нём уже есть методы:

  • login (предоставляет доступ клиента к подсистеме)
  • registerChecker (непосредственно проверка учетных данных).

Для привязки бизнес-логики к системе аутентификации используется Realm-объект. Так как клиент уже авторизован, здесь начинается логика нашей надстройки над SSH. Данный интерфейс имеет всего один метод requestAvatar, который вызывается при успешной авторизации в Portal и возвращает основной объект — SwitchProtocolAvatar:


@implementer(portal.IRealm)
class SwitchRealm(object):
    def __init__(self, switch_obj):
        self.switch_obj = switch_obj

    def requestAvatar(self, avatarId, mind, *interfaces):
        return interfaces[0], SwitchProtocolAvatar(avatarId, switch_obj=self.switch_obj), lambda: None
Самая простая реализация Realm-объекта, возвращающая необходимый Avatar

За управление бизнес-логикой отвечают специальные объекты — Avatar`ы. В нашем случае здесь начинается надстройка над протоколом SSH. Когда отправляется запрос, данные попадают в SwitchProtocolAvatar, который проверяет подсистему запроса и обновляет конфигурацию:


class SwitchProtocolAvatar(avatar.ConchUser):
    def __init__(self, username, switch_core):
        avatar.ConchUser.__init__(self)
        self.username = username
        self.channelLookup.update({b'session': session.SSHSession})

        netconf_protocol = switch_core.get_netconf_protocol()
        if netconf_protocol:
            self.subsystemLookup.update({b'netconf': netconf_protocol})
Проверяем подсистему и обновляем конфигурацию при условии, что данный обработчик коммутатора работает по NETCONF

К слову о протоколах. Не забываем, что мы работаем с NETCONF, и приступаем к выполнению. Для написания надстроек над уже существующими протоколами и реализации своей логики используется Protocol. Интерфейс данного класса простой:

  • dataReceived — используется для обработки событий на получение данных;
  • makeConnection — используется для установки соединения;
  • сonnectionMade — используется, когда соединение уже установлено. Здесь можно определить некоторую логику до того, как клиент начнет присылать запросы. В нашем случае надо отправить список своих capabilities.

class Netconf(Protocol):
    def __init__(self, capabilities=None):
        self.session_count = 0
        self.capabilities = capabilities

    def __call__(self, *args, **kwargs):
        return self

    def connectionMade(self):
        self.session_count += 1
        self.send_capabilities()

    def send_capabilities(self):
        rpc_capabilities_reply = "{capabilities}" \
                                 "{session_id}]]>]]>"
        rpc_capabilities = "".join(f"{cap}" for cap in self.capabilities)
        
        self.transport.write(rpc_capabilities_reply.format(capabilities=rpc_capabilities, 
                                                           session_id=self.session_count))

    def dataReceived(self, data):
        # Process received data
        pass
Минимальная реализация обертки над протоколом. Убрал лишнюю логику для наглядности

Начинаем сворачивать нашу матрешку. Так как мы используем надстройку над SSH, то нам необходимо реализовать логику SSH-сервера. В нём мы определим ключи для сервера и обработчики для служб SSH. Реализация данного класса не сильно нас интересует, так как авторизация будет по паролю:


class SshServerFactory(factory.SSHFactory):
    protocol = SSHServerTransport
    
    publicKeys = {b'ssh-rsa': keys.Key.fromFile(SERVER_RSA_PUBLIC)}
    privateKeys = {b'ssh-rsa': keys.Key.fromFile(SERVER_RSA_PRIVATE)}

    services = {
        b'ssh-userauth': userauth.SSHUserAuthServer,
        b'ssh-connection': connection.SSHConnection
    }

    def getPrimes(self):
        return PRIMES
Реализация SSH-сервера

Для работы SSH-сервера необходимо определить логику сессий, которая работает вне зависимости от того, по какому протоколу к нам пришли, и какой интерфейс запрашивается:


class EchoProtocol(protocol.Protocol):
    def dataReceived(self, data):
        if data == b'\r':
            data = b'\r\n'
        elif data == b'\x03':  # Ctrl+C
            self.transport.loseConnection()
            return
        self.transport.write(data)


class Session:
    def __init__(self, avatar):
        pass

    def getPty(self, term, windowSize, attrs):
        pass

    def execCommand(self, proto, cmd):
        pass

    def openShell(self, transport):
        protocol = EchoProtocol()
        protocol.makeConnection(transport)
        transport.makeConnection(session.wrapProtocol(protocol))

    def eofReceived(self):
        pass

    def closed(self):
        pass
Логика сессий для всех описываемых интерфейсов

Чуть не забыл о самом обработчике. После всех проверок и авторизаций логика переходит к объекту, эмулирующему работу коммутатора. Тут можно определить логику обработки запросов: получение или редактирование интерфейсов, конфигурация устройства и т. д.


class Juniper:
    def __init__(self):
        self.protocol = Netconf(capabilities=self.capabilities())

    def get_netconf_protocol(self):
        return self.protocol

    @staticmethod
    def capabilities():
        return [
            "Candidate1_0urn:ietf:params:xml:ns:netconf:capability:candidate:1.0",
            "urn:ietf:params:xml:ns:netconf:capability:confirmed-commit:1.0",
            "urn:ietf:params:xml:ns:netconf:capability:validate:1.0",
            "urn:ietf:params:xml:ns:netconf:capability:url:1.0?protocol=http,ftp,file",
            "urn:ietf:params:netconf:capability:candidate:1.0",
            "urn:ietf:params:netconf:capability:confirmed-commit:1.0",
            "urn:ietf:params:netconf:capability:validate:1.0",
            "urn:ietf:params:netconf:capability:url:1.0?scheme=http,ftp,file"
        ]

Основная логика обработчика. Вырезал всю функциональность и обработку запросов, оставив только получение capabilities

Ну и наконец-то сращиваем всё это вместе. Регистрируем адаптер сессии (описывает поведение при подключении), определяем метод подключения по имени пользователя и паролю, настраиваем Portal и запускаем наш сервис:


components.registerAdapter(Session, SwitchProtocolAvatar, session.ISession)

switch_factory = SwitchFactory()
switch = switch_factory.get("juniper")

portal = portal.Portal(CustomRealm(switch))
credential_source = InMemoryUsernamePasswordDatabaseDontUse()
credential_source.addUser(b'admin', b'admin')
portal.registerChecker(credential_source)

SshServerFactory.portal = portal

reactor.listenTCP(830, SshServerFactory())
reactor.run()
Настройка и запуск сервера
Запускаем mock-сервер. Для проверки работоспособности можно подключиться с помощью библиотеки ncclient. Обычной проверки соединения и просмотра capabilities сервера хватит:

from ncclient import manager

connection = manager.connect(host="127.0.0.1",
                             port=830,
                             username="admin",
                             password="admin",
                             timeout=60,
                             device_params={'name': 'junos'},
                             hostkey_verify=False)

for capability in connection.server_capabilities:
   print(capability)
Подключаемся к mock-серверу по протоколу NETCONF и выводим список capabilities сервера

Сам результат запроса представлен ниже. Мы успешно установили соединение, а сервер отдал нам список своих capabilities:


 Candidate1_0urn:ietf:params:xml:ns:netconf:capability:candidate:1.0
urn:ietf:params:xml:ns:netconf:capability:confirmed-commit:1.0
urn:ietf:params:xml:ns:netconf:capability:validate:1.0
urn:ietf:params:xml:ns:netconf:capability:url:1.0?protocol=http,ftp,file
urn:ietf:params:netconf:capability:candidate:1.0
urn:ietf:params:netconf:capability:confirmed-commit:1.0
urn:ietf:params:netconf:capability:validate:1.0
urn:ietf:params:netconf:capability:url:1.0?scheme=http,ftp,file
Capabilities сервера

Заключение

У данного решения достаточно плюсов и минусов. С одной стороны, мы тратим много времени на реализацию и описание всей логики обработки запросов. С другой — получаем возможность гибкой настройки и эмуляции поведения. Но главное — это масштабируемость. Фреймворк Twisted обладает богатой функциональностью и поддерживает большое число протоколов, поэтому можно без проблем описывать новые интерфейсы обработчиков. А если всё хорошо продумать, данную архитектуру можно использовать не только для работы с коммутаторами, но и для другого оборудования.

Хотелось бы узнать мнение читателей. Делали ли вы подобное и если да, то какие технологии использовали и как выстраивали процесс тестирования?