信雅纳网络测试的二次开发集成:XOA(Xena Open-Source Automation)开源自动化测试

本文主要是介绍信雅纳网络测试的二次开发集成:XOA(Xena Open-Source Automation)开源自动化测试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

XOA是什么

XOA CLI

XOA Python API

​XOA Python Test Suite/测试套件

XOA Converter

Source Code


XOA是什么

XOA(Xena Open-Source Automation)是一个开源的测试自动化框架,追求“高效、易用、灵活”的跨操作系统的开发框架。能与Xena现有解决方案无缝配合,借助XOA可调用Xena(Z系列打流仪、E系列损伤仪)完成自动化测试任务。同时它提供了操作接口可将其他仪表/设备做并栈集成测试验证,统一整理输出测试报告。

XOA包含:
XOA CLI、XOA Python API、XOA Python TestSuite、XOA Converter

信雅纳:网络测试自动化

XOA CLI

XOA CLI 提供了一套简洁直观的基于文本语言的独立命令,用以控制和集成 Xena测试仪硬件,实现各种测试任务的自动化。
任何客户端平台/编程语言(如 Python、Tcl、Bash)都可与 XOA CLI 配合使用。
CLI 可在远程登录终端上使用,直接向 Xena 测试仪发送命令。
ValkyrieManager通过测试端口配置文件(.xpc ),可在 XOA CLI 环境之间无缝转换。

XOA Python API

XOA Python API与XOA CLI和XenaManager无缝集成

  • 面向对象的高级抽象: XOA Python API采用面向对象的方法,提供了更高层次的抽象,加快了自动化脚本的开发。
  • 集成开发环境自动完成,内置手册: XOA Python API 包含 IDE 自动完成以及类、函数和 API 内置手册等功能,可显著提高开发效率。
  • 命令分组和响应自动匹配:该功能允许命令分组和响应自动匹配,从而优化了测试执行效率。
  • 服务器到客户端推送通知订阅: XOA Python API 支持服务器到客户端的推送通知订阅,降低了用户代码的复杂性。
  • 支持 Python 3.8 及更高版本: XOA Python API 兼容 Python 3.8 及更高版本,确保与现代 Python 环境兼容。
    import asyncio
    from contextlib import suppress
    from xoa_driver import testers
    from xoa_driver import modules
    from xoa_driver import ports
    from xoa_driver import utils
    from xoa_driver import enums
    from xoa_driver import exceptions
    from ipaddress import IPv4Address, IPv6Address
    from binascii import hexlify
    from xoa_driver.misc import Hex#---------------------------
    # Global parameters
    #---------------------------CHASSIS_IP = "10.165.16.70"      # Chassis IP address or hostname
    USERNAME = "XOA"                # Username
    MODULE_INDEX = 4                # Module index
    TX_PORT_INDEX = 0               # TX Port indexFRAME_SIZE_BYTES = 4178         # Frame size on wire including the FCS.
    FRAME_COUNT = 20              # The number of frames including the first, the middle, and the last.
    REPETITION = 1                  # The number of repetitions of the frame sequence, set to 0 if you want the port to repeat over and over
    TRAFFIC_RATE_FPS = 100          # Traffic rate in frames per second
    TRAFFIC_RATE_PERCENT = int(4/10 * 1000000)SHOULD_BURST = False            # Whether the middle frames should be bursty
    BURST_SIZE_FRAMES = 9           # Burst size in frames for the middle frames
    INTER_BURST_GAP_BYTES = 3000    # The inter-burst gap in bytes
    INTRA_BURST_GAP_BYTES = 1000    # The inter-frame gap within a burst, aka. intra-burst gap, in bytes#---------------------------
    # Header content for streams
    #---------------------------
    class Ethernet:def __init__(self):self.dst_mac = "0000.0000.0000"self.src_mac = "0000.0000.0000"self.ethertype = "86DD"def __str__(self):_dst_mac = self.dst_mac.replace(".", "")_src_mac = self.src_mac.replace(".", "")_ethertype = self.ethertypereturn f"{_dst_mac}{_src_mac}{_ethertype}".upper()class IPV4:def __init__(self):self.version = 4self.header_length = 5self.dscp = 0self.ecn = 0self.total_length = 42self.identification = "0000"self.flags = 0self.offset = 0self.ttl = 255self.proto = 255self.checksum = "0000"self.src = "0.0.0.0"self.dst = "0.0.0.0"def __str__(self):_ver = '{:01X}'.format(self.version)_header_length = '{:01X}'.format(self.header_length)_dscp_ecn = '{:02X}'.format((self.dscp<<2)+self.ecn)_total_len = '{:04X}'.format(self.total_length)_ident = self.identification_flag_offset = '{:04X}'.format((self.flags<<13)+self.offset)_ttl = '{:02X}'.format(self.ttl)_proto = '{:02X}'.format(self.proto)_check = self.checksum_src = hexlify(IPv4Address(self.src).packed).decode()_dst = hexlify(IPv4Address(self.dst).packed).decode()return f"{_ver}{_header_length}{_dscp_ecn}{_total_len}{_ident}{_flag_offset}{_ttl}{_proto}{_check}{_src}{_dst}".upper()class IPV6:def __init__(self):self.version = 6self.traff_class = 8self.flow_label = 0self.payload_length = 0self.next_header = "11"self.hop_limit = 1self.src = "2000::2"self.dst = "2000::100"def __str__(self):_ver = '{:01X}'.format(self.version)_traff_class = '{:01X}'.format(self.traff_class)_flow_label = '{:06X}'.format(self.flow_label)_payload_len = '{:04X}'.format(self.payload_length)_next_header = self.next_header_hop_limit = '{:02X}'.format(self.hop_limit)_src = hexlify(IPv6Address(self.src).packed).decode()_dst = hexlify(IPv6Address(self.dst).packed).decode()return f"{_ver}{_traff_class}{_flow_label}{_payload_len}{_next_header}{_hop_limit}{_src}{_dst}".upper()class UDP:def __init__(self):self.src_port = 0self.dst_port = 0self.length = 0self.checksum = 0def __str__(self):_src_port = '{:04X}'.format(self.src_port)_dst_port = '{:04X}'.format(self.dst_port)_length = '{:04X}'.format(self.length)_checksum = '{:04X}'.format(self.checksum)return f"{_src_port}{_dst_port}{_length}{_checksum}".upper()class ROCEV2:def __init__(self):self.opcode = 0self.solicited_event = 0self.mig_req = 0self.pad_count = 1self.header_version = 0self.partition_key = 65535self.reserved = 7self.dest_queue_pair = 2self.ack_request = 0self.reserved_7bits = 0self.packet_seq_number =0def __str__(self):_opcode = '{:02X}'.format(self.opcode)_combo_1 = '{:02X}'.format((self.solicited_event<<7)+(self.mig_req<<6)+(self.pad_count<<4)+self.header_version)_pk = '{:04X}'.format(self.partition_key)_reserved = '{:02X}'.format(self.reserved)_qp = '{:06X}'.format(self.dest_queue_pair)_combo_2 = '{:02X}'.format((self.ack_request<<7)+self.reserved_7bits)_ps = '{:06X}'.format(self.packet_seq_number)return f"{_opcode}{_combo_1}{_pk}{_reserved}{_qp}{_combo_2}{_ps}".upper()#------------------------------
    # def my_awesome_func()
    #------------------------------
    async def my_awesome_func(stop_event: asyncio.Event, should_burst: bool) -> None:"""This Python function uses XOA Python API to configure the TX port:param stop_event::type stop_event: asyncio.Event:param should_burst: Whether the middle frames should be bursty.:type should_burst: bool"""# create tester instance and establish connectiontester = await testers.L23Tester(CHASSIS_IP, USERNAME, enable_logging=False) # access the module on the testermodule = tester.modules.obtain(MODULE_INDEX)# check if the module is of type Loki-100G-5S-2Pif not isinstance(module, modules.ModuleChimera):# access the txport on the moduletxport = module.ports.obtain(TX_PORT_INDEX)#---------------------------# Port reservation#---------------------------print(f"#---------------------------")print(f"# Port reservation")print(f"#---------------------------")if txport.is_released():print(f"The txport is released (not owned by anyone). Will reserve the txport to continue txport configuration.")await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our sessionelif not txport.is_reserved_by_me():print(f"The txport is reserved by others. Will relinquish and reserve the txport to continue txport configuration.")await txport.reservation.set_relinquish() # send relinquish the txportawait txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session#---------------------------# Start port configuration#---------------------------print(f"#---------------------------")print(f"# Start port configuration")print(f"#---------------------------")print(f"Reset the txport")await txport.reset.set()print(f"Configure the txport")await utils.apply(# txport.speed.mode.selection.set(mode=enums.PortSpeedMode.F100G),txport.comment.set(comment="RoCE2 on Loki"),txport.tx_config.enable.set_on(),txport.latency_config.offset.set(offset=0),txport.latency_config.mode.set(mode=enums.LatencyMode.LAST2LAST),txport.tx_config.burst_period.set(burst_period=0),txport.tx_config.packet_limit.set(packet_count_limit=FRAME_COUNT*REPETITION),txport.max_header_length.set(max_header_length=128),txport.autotrain.set(interval=0),txport.loop_back.set_none(),                                # If you want loopback the port TX to its own RX, change it to set_txoff2rx()txport.checksum.set(offset=0),txport.tx_config.delay.set(delay_val=0),txport.tpld_mode.set_normal(),txport.payload_mode.set_normal(),#txport.rate.pps.set(port_rate_pps=TRAFFIC_RATE_FPS),       # If you want to control traffic rate with FPS, uncomment this.txport.rate.fraction.set(TRAFFIC_RATE_PERCENT),                          # If you want to control traffic rate with fraction, uncomment this. 1,000,000 = 100%)if should_burst:await txport.tx_config.mode.set_burst()else:await txport.tx_config.mode.set_sequential()#--------------------------------------# Configure stream_0 on the txport#--------------------------------------print(f"   Configure first-packet stream on the txport")stream_0 = await txport.streams.create()eth = Ethernet()eth.src_mac = "aaaa.aaaa.0005"eth.dst_mac = "bbbb.bbbb.0005"ipv4 = IPV4()ipv4.src = "1.1.1.5"ipv4.dst = "2.2.2.5"ipv6 = IPV6()ipv6.src = "2001::5"ipv6.dst = "2002::5"udp = UDP()udp.src_port = 4791udp.dst_port = 4791rocev2 = ROCEV2()rocev2.opcode = 0rocev2.dest_queue_pair = 2rocev2.packet_seq_number = 0await utils.apply(stream_0.enable.set_on(),stream_0.packet.limit.set(packet_count=1),stream_0.comment.set(f"First packet"),stream_0.rate.fraction.set(stream_rate_ppm=10000),stream_0.packet.header.protocol.set(segments=[enums.ProtocolOption.ETHERNET,enums.ProtocolOption.IPV6,enums.ProtocolOption.UDP,enums.ProtocolOption.RAW_12,]),stream_0.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),stream_0.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),stream_0.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD")),stream_0.tpld_id.set(test_payload_identifier = 0),stream_0.insert_packets_checksum.set_on())if should_burst:await stream_0.burst.burstiness.set(size=1, density=100)await stream_0.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)#--------------------------------------# Configure stream_1 on the txport#--------------------------------------print(f"   Configure middle-packets stream on the txport")stream_1 = await txport.streams.create()rocev2.opcode = 1rocev2.dest_queue_pair = 2rocev2.packet_seq_number = 1await utils.apply(stream_1.enable.set_on(),stream_1.packet.limit.set(packet_count=FRAME_COUNT-2),stream_1.comment.set(f"Middle packets"),stream_1.rate.fraction.set(stream_rate_ppm=10000),stream_1.packet.header.protocol.set(segments=[enums.ProtocolOption.ETHERNET,enums.ProtocolOption.IPV6,enums.ProtocolOption.UDP,enums.ProtocolOption.RAW_12,]),stream_1.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),stream_1.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),stream_1.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD")),stream_1.tpld_id.set(test_payload_identifier = 1),stream_1.insert_packets_checksum.set_on())if should_burst:await stream_1.burst.burstiness.set(size=BURST_SIZE_FRAMES, density=100)await stream_1.burst.gap.set(inter_packet_gap=INTRA_BURST_GAP_BYTES, inter_burst_gap=INTER_BURST_GAP_BYTES)# Configure a modifier on the stream_1await stream_1.packet.header.modifiers.configure(1)# Modifier on the SQNmodifier = stream_1.packet.header.modifiers.obtain(0)await modifier.specification.set(position=72, mask="FFFF0000", action=enums.ModifierAction.INC, repetition=1)await modifier.range.set(min_val=1, step=1, max_val=FRAME_COUNT-2)#--------------------------------------# Configure stream_2 on the txport#--------------------------------------print(f"   Configure last-packet stream on the txport")stream_2 = await txport.streams.create()rocev2.opcode = 2rocev2.dest_queue_pair = 2rocev2.packet_seq_number = FRAME_COUNT-1await utils.apply(stream_2.enable.set_on(),stream_2.packet.limit.set(packet_count=1),stream_2.comment.set(f"Last packet"),stream_2.rate.fraction.set(stream_rate_ppm=10000),stream_2.packet.header.protocol.set(segments=[enums.ProtocolOption.ETHERNET,enums.ProtocolOption.IPV6,enums.ProtocolOption.UDP,enums.ProtocolOption.RAW_12,]),stream_2.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),stream_2.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),stream_2.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("AABBCCDD")),stream_2.tpld_id.set(test_payload_identifier = 2),stream_2.insert_packets_checksum.set_on())if should_burst:await stream_2.burst.burstiness.set(size=1, density=100)await stream_2.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)async def main():stop_event =asyncio.Event()try:await my_awesome_func(stop_event, should_burst=SHOULD_BURST)except KeyboardInterrupt:stop_event.set()if __name__=="__main__":asyncio.run(main())

信雅纳:Python XOA测试自动化
 

XOA Python Test Suite/测试套件

XOA Python 测试套件是一个测试框架,为开发人员和测试专家执行和集成 Xena 测试套件提供了定义明确的 API。
该框架以自动化方式处理各种任务,如测试资源管理、测试执行和发布测试结果。
每个 RFC 测试套件都被设计成独立的 "插件",可根据需要有选择性地集成到项目中。
目前,XOA Python 测试套件包括
- RFC2544
- RFC2889
- RFC3918

XOA Converter

如果您希望将当前的 Xena 测试套件配置快速迁移到 XOA,现在使用 XOA 转换器工具比以往任何时候都更容易。

以前,Xena的测试套件应用程序仅与Windows兼容。但今后,所有现有和未来的测试套件都将并入 XOA Python 测试套件,从而消除 Windows 限制。

为了简化过渡,我们推出了 XOA 转换器。该工具允许用户将现有的Xena测试套件配置(Xena2544、Xena2889和Xena3918)从Xena窗口桌面应用程序无缝迁移到XOA Python测试套件中。有了 XOA 转换器,迁移过程变得轻松简单。

信雅纳:网络测试仪自动化测试

Source Code

GitHub 是我们托管 XOA 源代码的首选平台,因为它具有出色的版本控制和协作能力。它为管理代码变更提供了一个极佳的环境,确保项目的历史记录完备且易于访问。我们崇尚开放,鼓励每个人使用、分享、贡献和反馈我们的源代码。GitHub 允许进行无缝协作,并促进以社区为导向的方法,让每个人都能积极参与 XOA 的开发和改进。我们重视来自社区的意见和贡献,因为这能提高源代码的整体质量和创新性。

  • XOA Python API Source Code
  • XOA Python Test Suite – Core Source Code
  • XOA Python Test Suite – Plugin Source Code
  • XOA ANLT Utility Source Code
  • XOA Converter Source Code

这篇关于信雅纳网络测试的二次开发集成:XOA(Xena Open-Source Automation)开源自动化测试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/813877

相关文章

Python自动化处理PDF文档的操作完整指南

《Python自动化处理PDF文档的操作完整指南》在办公自动化中,PDF文档处理是一项常见需求,本文将介绍如何使用Python实现PDF文档的自动化处理,感兴趣的小伙伴可以跟随小编一起学习一下... 目录使用pymupdf读写PDF文件基本概念安装pymupdf提取文本内容提取图像添加水印使用pdfplum

SpringBoot集成XXL-JOB实现任务管理全流程

《SpringBoot集成XXL-JOB实现任务管理全流程》XXL-JOB是一款轻量级分布式任务调度平台,功能丰富、界面简洁、易于扩展,本文介绍如何通过SpringBoot项目,使用RestTempl... 目录一、前言二、项目结构简述三、Maven 依赖四、Controller 代码详解五、Service

基于Python实现自动化邮件发送系统的完整指南

《基于Python实现自动化邮件发送系统的完整指南》在现代软件开发和自动化流程中,邮件通知是一个常见且实用的功能,无论是用于发送报告、告警信息还是用户提醒,通过Python实现自动化的邮件发送功能都能... 目录一、前言:二、项目概述三、配置文件 `.env` 解析四、代码结构解析1. 导入模块2. 加载环

Python实战之SEO优化自动化工具开发指南

《Python实战之SEO优化自动化工具开发指南》在数字化营销时代,搜索引擎优化(SEO)已成为网站获取流量的重要手段,本文将带您使用Python开发一套完整的SEO自动化工具,需要的可以了解下... 目录前言项目概述技术栈选择核心模块实现1. 关键词研究模块2. 网站技术seo检测模块3. 内容优化分析模

springboot2.1.3 hystrix集成及hystrix-dashboard监控详解

《springboot2.1.3hystrix集成及hystrix-dashboard监控详解》Hystrix是Netflix开源的微服务容错工具,通过线程池隔离和熔断机制防止服务崩溃,支持降级、监... 目录Hystrix是Netflix开源技术www.chinasem.cn栈中的又一员猛将Hystrix熔

MyBatis-Plus 与 Spring Boot 集成原理实战示例

《MyBatis-Plus与SpringBoot集成原理实战示例》MyBatis-Plus通过自动配置与核心组件集成SpringBoot实现零配置,提供分页、逻辑删除等插件化功能,增强MyBa... 目录 一、MyBATis-Plus 简介 二、集成方式(Spring Boot)1. 引入依赖 三、核心机制

Python使用python-pptx自动化操作和生成PPT

《Python使用python-pptx自动化操作和生成PPT》这篇文章主要为大家详细介绍了如何使用python-pptx库实现PPT自动化,并提供实用的代码示例和应用场景,感兴趣的小伙伴可以跟随小编... 目录使用python-pptx操作PPT文档安装python-pptx基础概念创建新的PPT文档查看

基于Python编写自动化邮件发送程序(进阶版)

《基于Python编写自动化邮件发送程序(进阶版)》在数字化时代,自动化邮件发送功能已成为企业和个人提升工作效率的重要工具,本文将使用Python编写一个简单的自动化邮件发送程序,希望对大家有所帮助... 目录理解SMTP协议基础配置开发环境构建邮件发送函数核心逻辑实现完整发送流程添加附件支持功能实现htm

SpringBoot集成P6Spy的实现示例

《SpringBoot集成P6Spy的实现示例》本文主要介绍了SpringBoot集成P6Spy的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录本节目标P6Spy简介抛出问题集成P6Spy1. SpringBoot三板斧之加入依赖2. 修改

Python开发简易网络服务器的示例详解(新手入门)

《Python开发简易网络服务器的示例详解(新手入门)》网络服务器是互联网基础设施的核心组件,它本质上是一个持续运行的程序,负责监听特定端口,本文将使用Python开发一个简单的网络服务器,感兴趣的小... 目录网络服务器基础概念python内置服务器模块1. HTTP服务器模块2. Socket服务器模块