diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index f6263e053..9474ebfb4 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -11,6 +11,9 @@ jobs:
test_novus_10g:
runs-on: [snappi-ixn-ci-novus10g]
steps:
+ - name: Set Timestamp
+ id: timestamp
+ run: echo "TIMESTAMP=$(TZ='Asia/Kolkata' date +"%D %T")" >> $GITHUB_ENV
- name: Checkout source
uses: actions/checkout@v2
with:
@@ -36,6 +39,22 @@ jobs:
- name: Run tests
run: |
TEST_USERNAME=${{secrets.TEST_USERNAME}} ${{steps.path.outputs.pythonv}} do.py test novus10g
+ - name: Send Coverage Email
+ run: |
+ ${{steps.path.outputs.pythonv}} do.py coverage
+
+ - name: Generate Allure report
+ run: |
+ source ~/.profile
+ ${{steps.path.outputs.pythonv}} do.py generate_allure_report
+
+ - name: Deploy report to Github Pages
+ if: always()
+ uses: peaceiris/actions-gh-pages@v2
+ env:
+ PERSONAL_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ PUBLISH_BRANCH: gh-pages
+ PUBLISH_DIR: allure-report
publish_artifacts:
runs-on: [snappi-ixn-ci-novus10g]
diff --git a/do.py b/do.py
index 84559065f..5d3b32479 100644
--- a/do.py
+++ b/do.py
@@ -4,8 +4,12 @@
import sys
import shutil
import subprocess
+import smtplib
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+global ixnexception
def setup():
run(
[
@@ -70,18 +74,23 @@ def test(card="novus100g"):
"--username=" + username,
"--psd='" + psd + "'",
"tests",
- '-m "not e2e and not l1_manual and not uhd"',
- "--cov=./snappi_ixnetwork --cov-report term"
+ #'-m "not e2e and not l1_manual and not uhd"',
+ '-m "runonly"',
+ "--cov=./snappi_ixnetwork --cov-report term",
" --cov-report html:cov_report",
+ " -o junit_logging=all --junitxml=allure-results/report-pytest.xml"
]
print(args)
run(
[
py() + " -m pip install pytest-cov",
- py() + " -m pytest -sv {}".format(" ".join(args)),
+ py() + " -m pytest -sv {} | tee myfile.log ".format(" ".join(args)),
]
)
+
+
+
import re
with open("./cov_report/index.html") as fp:
@@ -100,6 +109,137 @@ def test(card="novus100g"):
)
)
+def generate_allure_report():
+ run(["mkdir -p allure-results/history"])
+ run(["cp -r $HOME/allure-report/history/* allure-results/history/"])
+ run(["rm -rf $HOME/allure-report"])
+
+ run(['echo "CI/CD-Information" > allure-results/environment.properties',
+ 'echo "Platform = athena-g" >> allure-results/environment.properties',
+ 'echo "Release = 5.15.0-60-generic" >> allure-results/environment.properties',
+ 'echo "OS-Version" >> allure-results/environment.properties',
+ "lsb_release -a | sed -E 's/([^:]+) /\1-/g' | sed 's/:/=/g' > version.txt",
+ 'cat version.txt >> allure-results/environment.properties',
+ 'rm -rf version.txt',
+ 'echo "Environment-Details" >> allure-results/environment.properties',
+ "python_ver=`python3 --version`",
+ "pytest_ver=`pytest --version`",
+ 'echo "Python-Version = $python_ver" >> allure-results/environment.properties',
+ 'echo "Pytest-Version = $pytest_ver" >> allure-results/environment.properties',
+ 'go_ver=`go version`',
+ 'echo "Go-Version = $go_ver" >> allure-results/environment.properties',
+ 'allure_ver=$(docker exec "$CONTAINER_NAME" allure --version)',
+ 'echo "Allure-Version = $allure_ver" >> allure-results/environment.properties'])
+
+
+ run(
+ [
+ "allure generate allure-results -c -o allure-report",
+ ]
+ )
+
+ run(["cp -r allure-report $HOME/allure-report "])
+
+
+def coverage():
+
+ test_start = (subprocess.check_output("echo $TIMESTAMP", shell=True)).decode('ascii')
+ coverage_threshold = 67
+ global result
+ with open("myfile.log") as fp:
+ out = fp.read()
+ total_selected_tests = re.findall(r"collecting.*\s+(\d+)\s+selected", out)[0]
+ total_passed_tests = re.findall(r"=.*\s(\d+)\s+passed", out)[0]
+ if re.findall(r"=.*\s(\d+)\s+skipped",out):
+ total_skipped_tests = re.findall(r"=.*\s(\d+)\s+skipped", out)[0]
+ else:
+ total_skipped_tests = 0
+
+ total_failed_tests = int(total_selected_tests) - int(total_passed_tests) - int(total_skipped_tests)
+
+ with open("./cov_report/index.html") as fp:
+ out = fp.read()
+ result = re.findall(r"data-ratio.*?[>](\d+)\b", out)[-1]
+
+ sender = "ixnetworksnappi@gmail.com"
+ #receiver = ["arkajyoti.dutta@keysight.com","indrani.bhattacharya@keysight.com","dipendu.ghosh@keysight.com","desai.mg@keysight.com"]
+ receiver = ["desai.mg@keysight.com"]
+ msg = MIMEMultipart('alternative')
+ msg['Subject'] = "Snappi-Ixnetwork Coverage Email"
+ msg['From'] = sender
+ msg['To'] = ", ".join(receiver)
+
+ val1=total_selected_tests
+ val2=total_passed_tests
+ val3=total_failed_tests
+
+ build_number=get_workflow_id()
+
+ # Create the body of the message (a plain-text and an HTML version).
+ text = "Hi!"
+ html = """\
+
+
+
+
+ Hi All,
+ Please find the coverage results for the build execution ID : """+str(build_number)+"""
+ Build started on : """+str(test_start)+""" IST
+
+
+
+
+ Total Testcases |
+ """+str(val1)+""" |
+
+
+ Total Test Pass |
+ """+str(val2)+""" |
+
+
+ Total Test Fail |
+ """+str(val3)+""" |
+
+
+ Test Coverage Percentage |
+ """+str(result)+""" |
+
+
+
+ Click on the url for detailed test execution summary : Report
+
+
+
Thanks,
+ Snappi-Ixnetwork Team
+
+
+
+
+ """
+ #.format("200","198","2","99%")
+
+ # Record the MIME types of both parts - text/plain and text/html.
+ part1 = MIMEText(text, 'plain')
+ part2 = MIMEText(html, 'html')
+
+ # Attach parts into message container.
+ # According to RFC 2046, the last part of a multipart message, in this case
+ # the HTML message, is best and preferred.
+ msg.attach(part1)
+ msg.attach(part2)
+
+ # Send the message via local SMTP server.
+ s = smtplib.SMTP('smtp.gmail.com', 587)
+ s.starttls()
+ s.login(sender, "fbgt tiid rduu ajar")
+ # sendmail function takes 3 arguments: sender's address, recipient's address
+ # and message to send - here it is sent as one string.
+ s.sendmail(sender, receiver, msg.as_string())
+ s.quit()
def dist():
clean()
@@ -243,14 +383,15 @@ def run(commands):
cmd = cmd.encode("utf-8", errors="ignore")
subprocess.check_call(cmd, shell=True)
except Exception:
- sys.exit(1)
-
+ ixnexception = False
+ #sys.exit(1)
def get_workflow_id():
import requests
cmd = "https://api.github.com/repos/open-traffic-generator/snappi-ixnetwork/actions/runs"
res = requests.get(cmd)
+ print(res)
workflow_id = res.json()["workflow_runs"][0]["workflow_id"]
return workflow_id
diff --git a/requirements.txt b/requirements.txt
index 07652960c..83445921e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,4 +4,6 @@ netaddr==0.8.0
ipaddress==1.0.23
flake8
dpkt
-black; python_version > '3.6'
\ No newline at end of file
+black
+pytest-cov
+allure-pytest; python_version > '3.6'
\ No newline at end of file
diff --git a/snappi_ixnetwork/capture.py b/snappi_ixnetwork/capture.py
index ce41f6c6d..33203c938 100644
--- a/snappi_ixnetwork/capture.py
+++ b/snappi_ixnetwork/capture.py
@@ -138,8 +138,8 @@ def _config_missing_pallete(self, cap_filter, pallette, trigger, filter):
pallete_map = getattr(
self, "_{0}_OFFSET_MAP".format(cap_filter.parent.choice.upper())
)
- for field_name in dir(cap_filter):
- if field_name not in pallete_map:
+ for field_name in pallete_map:
+ if field_name not in dir(cap_filter):
raise Exception(
"Api not implimented for {0}".format(field_name)
)
diff --git a/snappi_ixnetwork/device/utils.py b/snappi_ixnetwork/device/utils.py
index b7751a634..71376ccff 100644
--- a/snappi_ixnetwork/device/utils.py
+++ b/snappi_ixnetwork/device/utils.py
@@ -13,15 +13,15 @@ def namedtuple_with_defaults(typename, field_names, default_values=()):
T.__new__.__defaults__ = tuple(prototype)
return T
-
-def asdot2plain(asdot):
- """This returns an ASPLAIN formated ASN given an ASDOT+ format"""
- if re.findall(r"\.|\:", asdot):
- left, right = re.split(r"\.|\:", asdot)
- ret = int(left) * 65536 + int(right)
- return ret
- else:
- return int(asdot)
+# commenting unused libraries
+# def asdot2plain(asdot):
+# """This returns an ASPLAIN formated ASN given an ASDOT+ format"""
+# if re.findall(r"\.|\:", asdot):
+# left, right = re.split(r"\.|\:", asdot)
+# ret = int(left) * 65536 + int(right)
+# return ret
+# else:
+# return int(asdot)
def convert_as_values(as_types, as_values):
@@ -49,7 +49,6 @@ def convert_as_values(as_types, as_values):
convert_values.ip_addr[idx] = num
return convert_values
-
def hex_to_ipv4(hex_value):
bytes = ["".join(x) for x in zip(*[iter(hex_value)] * 2)]
bytes = [int(x, 16) for x in bytes]
diff --git a/snappi_ixnetwork/ping.py b/snappi_ixnetwork/ping.py
index fc4eb5f45..845de9470 100644
--- a/snappi_ixnetwork/ping.py
+++ b/snappi_ixnetwork/ping.py
@@ -32,52 +32,55 @@ def results(self, ping_request, req_type=None):
for ip in eth.ipv6_addresses:
v6_names.append(ip.name)
if req_type==None :
- with Timer(self._api, "Ping requests completed in"):
- for endpoint in ping_request.endpoints:
- response = {}
- req_type = endpoint.parent.choice
- src_name = endpoint.get("src_name")
- dst_ip = endpoint.get("dst_ip")
- if req_type == "ipv4":
- if src_name not in v4_names:
- msg = (
- src_name
- + """ is not available in the configured v4 interface names """
- + str(v4_names)
- )
- raise Exception(msg)
- ip_obj = (
- self._api._ixnetwork.Topology.find()
- .DeviceGroup.find()
- .Ethernet.find()
- .Ipv4.find(Name=src_name)
- )
- elif req_type == "ipv6":
- if src_name not in v6_names:
- msg = (
- src_name
- + """ is not available in the configured v6 interface names """
- + str(v6_names)
- )
- raise Exception(msg)
- ip_obj = (
- self._api._ixnetwork.Topology.find()
- .DeviceGroup.find()
- .Ethernet.find()
- .Ipv6.find(Name=src_name)
- )
- self._api.info("Sending ping to %s" % dst_ip)
- ping_status = ip_obj.SendPing(DestIP=dst_ip)
- for reply in ping_status:
- if dst_ip in reply["arg3"]:
- if reply["arg2"]:
- response["result"] = "success"
- else:
- response["result"] = "failure"
- response["src_name"] = src_name
- response["dst_ip"] = dst_ip
- responses.append(response)
- return responses
+ pass
+ # commenting this section as the latest snappi doesnt pass the request without req_type
+ # with Timer(self._api, "Ping requests completed in"):
+ # for endpoint in ping_request.endpoints:
+ # response = {}
+ # req_type = endpoint.parent.choice
+ # src_name = endpoint.get("src_name")
+ # dst_ip = endpoint.get("dst_ip")
+ # if req_type == "ipv4":
+ # if src_name not in v4_names:
+ # msg = (
+ # src_name
+ # + """ is not available in the configured v4 interface names """
+ # + str(v4_names)
+ # )
+ # raise Exception(msg)
+ # ip_obj = (
+ # self._api._ixnetwork.Topology.find()
+ # .DeviceGroup.find()
+ # .Ethernet.find()
+ # .Ipv4.find(Name=src_name)
+ # )
+ # elif req_type == "ipv6":
+ # if src_name not in v6_names:
+ # msg = (
+ # src_name
+ # + """ is not available in the configured v6 interface names """
+ # + str(v6_names)
+ # )
+ # raise Exception(msg)
+ # ip_obj = (
+ # self._api._ixnetwork.Topology.find()
+ # .DeviceGroup.find()
+ # .Ethernet.find()
+ # .Ipv6.find(Name=src_name)
+ # )
+ # self._api.info("Sending ping to %s" % dst_ip)
+ # ping_status = ip_obj.SendPing(DestIP=dst_ip)
+ # for reply in ping_status:
+ # if dst_ip in reply["arg3"]:
+ # if reply["arg2"]:
+ # response["result"] = "success"
+ # else:
+ # response["result"] = "failure"
+ # response["src_name"] = src_name
+ # response["dst_ip"] = dst_ip
+ # responses.append(response)
+ # return responses
+
else:
with Timer(self._api, "Ping requests completed in"):
for endpoint in ping_request.requests:
diff --git a/snappi_ixnetwork/snappi_api.py b/snappi_ixnetwork/snappi_api.py
index d4cdf2406..1cc38d8ac 100644
--- a/snappi_ixnetwork/snappi_api.py
+++ b/snappi_ixnetwork/snappi_api.py
@@ -403,156 +403,156 @@ def set_control_action(self, payload):
except Exception as err:
raise SnappiIxnException(err)
- def set_protocol_state(self, payload):
- """Set the transmit state of flows"""
- self.add_warnings(
- "set_protocol_state api is deprecated, Please use `set_control_state` with `protocol.all` choice instead"
- )
- try:
- if isinstance(payload, (type(self._protocol_state), str)) is False:
- raise TypeError(
- "The content must be of type Union[TransmitState, str]"
- )
- if isinstance(payload, str) is True:
- payload = self._protocol_state.deserialize(payload)
- self._connect()
- with Timer(self, "Setting Protocol state"):
- self.ngpf.set_protocol_state(payload)
- except Exception as err:
- raise SnappiIxnException(err)
- return self._request_detail()
-
- def set_transmit_state(self, payload):
- """Set the transmit state of flows"""
- self.add_warnings(
- "set_transmit_state api is deprecated, Please use `set_control_state` with `traffic` choice instead"
- )
- try:
- if isinstance(payload, (type(self._transmit_state), str)) is False:
- raise TypeError(
- "The content must be of type Union[TransmitState, str]"
- )
- if isinstance(payload, str) is True:
- payload = self._transmit_state.deserialize(payload)
- self._connect()
- self.traffic_item.transmit(payload)
- except Exception as err:
- raise SnappiIxnException(err)
- return self._request_detail()
-
- def set_link_state(self, link_state):
- self.add_warnings(
- "set_link_state api is deprecated, Please use `set_control_state` with `port.link` choice instead"
- )
- try:
- if isinstance(link_state, (type(self._link_state), str)) is False:
- raise TypeError(
- "The content must be of type Union[LinkState, str]"
- )
- if isinstance(link_state, str):
- link_state = self._link_state.deserialize(link_state)
- self._connect()
- if link_state.port_names is not None:
- self.vport.set_link_state(link_state)
- except Exception as err:
- raise SnappiIxnException(err)
- return self._request_detail()
-
- def set_capture_state(self, payload):
- """Starts capture on all ports that have capture enabled."""
- self.add_warnings(
- "set_capture_state api is deprecated, Please use `set_control_state` with `port.capture` choice instead"
- )
- try:
- if isinstance(payload, (type(self._capture_state), str)) is False:
- raise TypeError(
- "The content must be of type Union[CaptureState, str]"
- )
- if isinstance(payload, str) is True:
- payload = self._capture_state.deserialize(payload)
- self._connect()
- self.capture.set_capture_state(payload)
- except Exception as err:
- raise SnappiIxnException(err)
- return self._request_detail()
-
- def set_route_state(self, payload):
- self.add_warnings(
- "set_route_state api is deprecated, Please use `set_control_state` with `protocol.all` choice instead"
- )
- try:
- route_state = self.route_state()
- if isinstance(payload, (type(route_state), str)) is False:
- raise TypeError(
- "The content must be of type Union[RouteState, str]"
- )
- if isinstance(payload, str) is True:
- payload = route_state.deserialize(payload)
- self._connect()
- with Timer(self, "Setting route state"):
- self.ngpf.set_route_state(payload)
- return self._request_detail()
- except Exception as err:
- raise SnappiIxnException(err)
-
- def set_device_state(self, payload):
- self.add_warnings(
- "set_device_state api is deprecated, Please use `set_control_state` with `protocol.link` choice instead"
- )
- try:
- device_state = self.device_state()
- if isinstance(payload, (type(device_state), str)) is False:
- raise TypeError(
- "The content must be of type Union[DeviceState, str]"
- )
- if isinstance(payload, str) is True:
- payload = device_state.deserialize(payload)
- self._connect()
- with Timer(self, "Setting device state"):
- self.ngpf.set_device_state(payload)
- return self._request_detail()
- except Exception as err:
- raise SnappiIxnException(err)
-
- def send_ping(self, ping_request, cvg_api=None):
- self.add_warnings(
- "send_ping api is deprecated, Please use `set_control_action` with `protocol.ipv4.ping` choice instead"
- )
- try:
- if cvg_api:
- if isinstance(ping_request, type(cvg_api.ping_request())):
- if (
- isinstance(
- ping_request, (type(cvg_api.ping_request()), str)
- )
- is False
- ):
- raise TypeError(
- "The content must be of type Union[PingRequest, str]"
- )
- if isinstance(ping_request, str):
- ping_request = cvg_api.ping_request().deserialize(
- ping_request
- )
- ping_res = cvg_api.ping_response()
- cvg_api.ping_request().serialize()
- else:
- if (
- isinstance(ping_request, (type(self._ping_request), str))
- is False
- ):
- raise TypeError(
- "The content must be of type Union[PingRequest, str]"
- )
- if isinstance(ping_request, str):
- ping_request = self._ping_request.deserialize(ping_request)
- ping_res = self.ping_response()
- ping_request.serialize()
- self._connect()
- ping_res.responses.deserialize(self.ping.results(ping_request))
- return ping_res
- except Exception as err:
- raise SnappiIxnException(err)
+ # def set_protocol_state(self, payload):
+ # """Set the transmit state of flows"""
+ # self.add_warnings(
+ # "set_protocol_state api is deprecated, Please use `set_control_state` with `protocol.all` choice instead"
+ # )
+ # try:
+ # if isinstance(payload, (type(self._protocol_state), str)) is False:
+ # raise TypeError(
+ # "The content must be of type Union[TransmitState, str]"
+ # )
+ # if isinstance(payload, str) is True:
+ # payload = self._protocol_state.deserialize(payload)
+ # self._connect()
+ # with Timer(self, "Setting Protocol state"):
+ # self.ngpf.set_protocol_state(payload)
+ # except Exception as err:
+ # raise SnappiIxnException(err)
+ # return self._request_detail()
+
+ # def set_transmit_state(self, payload):
+ # """Set the transmit state of flows"""
+ # self.add_warnings(
+ # "set_transmit_state api is deprecated, Please use `set_control_state` with `traffic` choice instead"
+ # )
+ # try:
+ # if isinstance(payload, (type(self._transmit_state), str)) is False:
+ # raise TypeError(
+ # "The content must be of type Union[TransmitState, str]"
+ # )
+ # if isinstance(payload, str) is True:
+ # payload = self._transmit_state.deserialize(payload)
+ # self._connect()
+ # self.traffic_item.transmit(payload)
+ # except Exception as err:
+ # raise SnappiIxnException(err)
+ # return self._request_detail()
+
+ # def set_link_state(self, link_state):
+ # self.add_warnings(
+ # "set_link_state api is deprecated, Please use `set_control_state` with `port.link` choice instead"
+ # )
+ # try:
+ # if isinstance(link_state, (type(self._link_state), str)) is False:
+ # raise TypeError(
+ # "The content must be of type Union[LinkState, str]"
+ # )
+ # if isinstance(link_state, str):
+ # link_state = self._link_state.deserialize(link_state)
+ # self._connect()
+ # if link_state.port_names is not None:
+ # self.vport.set_link_state(link_state)
+ # except Exception as err:
+ # raise SnappiIxnException(err)
+ # return self._request_detail()
+
+ # def set_capture_state(self, payload):
+ # """Starts capture on all ports that have capture enabled."""
+ # self.add_warnings(
+ # "set_capture_state api is deprecated, Please use `set_control_state` with `port.capture` choice instead"
+ # )
+ # try:
+ # if isinstance(payload, (type(self._capture_state), str)) is False:
+ # raise TypeError(
+ # "The content must be of type Union[CaptureState, str]"
+ # )
+ # if isinstance(payload, str) is True:
+ # payload = self._capture_state.deserialize(payload)
+ # self._connect()
+ # self.capture.set_capture_state(payload)
+ # except Exception as err:
+ # raise SnappiIxnException(err)
+ # return self._request_detail()
+
+ # def set_route_state(self, payload):
+ # self.add_warnings(
+ # "set_route_state api is deprecated, Please use `set_control_state` with `protocol.all` choice instead"
+ # )
+ # try:
+ # route_state = self.route_state()
+ # if isinstance(payload, (type(route_state), str)) is False:
+ # raise TypeError(
+ # "The content must be of type Union[RouteState, str]"
+ # )
+ # if isinstance(payload, str) is True:
+ # payload = route_state.deserialize(payload)
+ # self._connect()
+ # with Timer(self, "Setting route state"):
+ # self.ngpf.set_route_state(payload)
+ # return self._request_detail()
+ # except Exception as err:
+ # raise SnappiIxnException(err)
+
+ # def set_device_state(self, payload):
+ # self.add_warnings(
+ # "set_device_state api is deprecated, Please use `set_control_state` with `protocol.link` choice instead"
+ # )
+ # try:
+ # device_state = self.device_state()
+ # if isinstance(payload, (type(device_state), str)) is False:
+ # raise TypeError(
+ # "The content must be of type Union[DeviceState, str]"
+ # )
+ # if isinstance(payload, str) is True:
+ # payload = device_state.deserialize(payload)
+ # self._connect()
+ # with Timer(self, "Setting device state"):
+ # self.ngpf.set_device_state(payload)
+ # return self._request_detail()
+ # except Exception as err:
+ # raise SnappiIxnException(err)
+
+ # def send_ping(self, ping_request, cvg_api=None):
+ # self.add_warnings(
+ # "send_ping api is deprecated, Please use `set_control_action` with `protocol.ipv4.ping` choice instead"
+ # )
+ # try:
+ # if cvg_api:
+ # if isinstance(ping_request, type(cvg_api.ping_request())):
+ # if (
+ # isinstance(
+ # ping_request, (type(cvg_api.ping_request()), str)
+ # )
+ # is False
+ # ):
+ # raise TypeError(
+ # "The content must be of type Union[PingRequest, str]"
+ # )
+ # if isinstance(ping_request, str):
+ # ping_request = cvg_api.ping_request().deserialize(
+ # ping_request
+ # )
+ # ping_res = cvg_api.ping_response()
+ # cvg_api.ping_request().serialize()
+ # else:
+ # if (
+ # isinstance(ping_request, (type(self._ping_request), str))
+ # is False
+ # ):
+ # raise TypeError(
+ # "The content must be of type Union[PingRequest, str]"
+ # )
+ # if isinstance(ping_request, str):
+ # ping_request = self._ping_request.deserialize(ping_request)
+ # ping_res = self.ping_response()
+ # ping_request.serialize()
+ # self._connect()
+ # ping_res.responses.deserialize(self.ping.results(ping_request))
+ # return ping_res
+ # except Exception as err:
+ # raise SnappiIxnException(err)
def get_capture(self, request):
"""Gets capture file and returns it as a byte stream"""
diff --git a/snappi_ixnetwork/trafficitem.py b/snappi_ixnetwork/trafficitem.py
index 1ebdbaad9..021a253ee 100644
--- a/snappi_ixnetwork/trafficitem.py
+++ b/snappi_ixnetwork/trafficitem.py
@@ -964,7 +964,7 @@ def _getUhdHeader(self, header=None):
elif header == "ethernetpause":
# This is to support 9.20 globalpause header
if "globalPause" in self._getProtocolTemplatelist():
- return header + "UHD"
+ return header
return header
def _getProtocolTemplatelist(self):
diff --git a/tests/arp/test_arp_packet.py b/tests/arp/test_arp_packet.py
index c0f6039df..0738edc86 100644
--- a/tests/arp/test_arp_packet.py
+++ b/tests/arp/test_arp_packet.py
@@ -1,6 +1,7 @@
import pytest
+import allure
-
+@pytest.mark.runonly
def test_arp_packet(api, b2b_raw_config_vports, utils, tx_vport, rx_vport):
flow1 = b2b_raw_config_vports.flows[0]
sender_hardware_addr = "00:0C:29:E3:53:EA"
diff --git a/tests/capture/test_capture_filter_settings.py b/tests/capture/test_capture_filter_settings.py
index 7127c56f1..790d3eea4 100644
--- a/tests/capture/test_capture_filter_settings.py
+++ b/tests/capture/test_capture_filter_settings.py
@@ -21,7 +21,7 @@ def test_capture_filter_settings(api, settings):
cap = config.captures.capture(name="capture1")[-1]
cap.port_names = [tx.name]
- filter1, filter2 = cap.filters.ethernet().custom()
+ filter1, filter2, filter3 = cap.filters.ethernet().custom().vlan()
# https://github.com/open-traffic-generator/snappi/issues/25
# currently assigning the choice as work around
@@ -39,6 +39,12 @@ def test_capture_filter_settings(api, settings):
filter2.mask = attrs["PatternMask1"]
filter2.negate = True
+ filter3.priority.value = '00ff'
+ filter3.cfi.mask='00dd'
+ filter3.id.negate = False
+ filter3.protocol.negate = False
+
+
try:
api.set_config(config)
except Exception as e:
diff --git a/tests/traffic/test_traffic_custom_header.py b/tests/traffic/test_traffic_custom_header.py
new file mode 100644
index 000000000..532b286db
--- /dev/null
+++ b/tests/traffic/test_traffic_custom_header.py
@@ -0,0 +1,30 @@
+def test_traffic_custom_header(api, b2b_raw_config, utils):
+ """
+ Configure the devices on Tx and Rx Port.
+ Configure the flow with devices as end points.
+ run the traffic
+ Validation,
+ - validate the port and flow statistics.
+ """
+
+ size = 1518
+ packets = 100
+
+ flow = b2b_raw_config.flows[0]
+ custom = flow.packet.custom()[-1]
+
+ custom.bytes="64"
+
+ metric_tag = custom.metric_tags.add()
+ metric_tag.name = "custom metric tag"
+ metric_tag.offset = 32
+ metric_tag.length = 32
+
+
+ flow.duration.fixed_packets.packets = packets
+ flow.size.fixed = size
+ flow.rate.percentage = 10
+ flow.metrics.enable = True
+
+ utils.start_traffic(api, b2b_raw_config)
+
diff --git a/tests/traffic/test_traffic_ethernet_pause.py b/tests/traffic/test_traffic_ethernet_pause.py
new file mode 100644
index 000000000..feffcd0f9
--- /dev/null
+++ b/tests/traffic/test_traffic_ethernet_pause.py
@@ -0,0 +1,27 @@
+def test_traffic_ethernet_pause(api, b2b_raw_config, utils):
+ """
+ Configure the devices on Tx and Rx Port.
+ Configure the flow with devices as end points.
+ run the traffic
+ Validation,
+ - validate the port and flow statistics.
+ """
+
+ size = 1518
+ packets = 100
+
+ flow = b2b_raw_config.flows[0]
+ eth = flow.packet.ethernetpause()[-1]
+
+ eth.src.value = "00:CD:DC:CD:DC:CD"
+ eth.dst.value = "00:AB:BC:AB:BC:AB"
+
+ eth.control_op_code.value= 115
+
+ flow.duration.fixed_packets.packets = packets
+ flow.size.fixed = size
+ flow.rate.percentage = 10
+ flow.metrics.enable = True
+
+ utils.start_traffic(api, b2b_raw_config)
+
diff --git a/tests/traffic/test_traffic_gptv_config.py b/tests/traffic/test_traffic_gptv_config.py
new file mode 100644
index 000000000..e0e2df824
--- /dev/null
+++ b/tests/traffic/test_traffic_gptv_config.py
@@ -0,0 +1,43 @@
+def test_traffic_gptv_config(api, b2b_raw_config, utils):
+ """
+ Configure the devices on Tx and Rx Port.
+ Configure the flow with devices as end points.
+ run the traffic
+ Validation,
+ - validate the port and flow statistics.
+ """
+
+ size = 1518
+ packets = 100
+
+ flow = b2b_raw_config.flows[0]
+ eth, gptv = flow.packet.ethernet().gtpv1()
+
+ eth.src.value = "00:CD:DC:CD:DC:CD"
+ eth.dst.value = "00:AB:BC:AB:BC:AB"
+
+ gptv.version.value=1
+ gptv.protocol_type.value=1
+ gptv.message_type.value=1
+ gptv.message_length.value=256
+
+ flow.duration.fixed_packets.packets = packets
+ flow.size.fixed = size
+ flow.rate.percentage = 10
+ flow.metrics.enable = True
+
+ utils.start_traffic(api, b2b_raw_config)
+ utils.wait_for(
+ lambda: results_ok(api, utils, size, packets),
+ "stats to be as expected",
+ timeout_seconds=10,
+ )
+
+def results_ok(api, utils, size, packets):
+ """
+ Returns true if stats are as expected, false otherwise.
+ """
+ port_results, flow_results = utils.get_all_stats(api)
+ frames_ok = utils.total_frames_ok(port_results, flow_results, packets)
+ bytes_ok = utils.total_bytes_ok(port_results, flow_results, packets * size)
+ return frames_ok and bytes_ok
diff --git a/tests/vxlan/test_vxlan_b2b.py b/tests/vxlan/test_vxlan_b2b.py
index 88570f273..3f9d8c1cf 100644
--- a/tests/vxlan/test_vxlan_b2b.py
+++ b/tests/vxlan/test_vxlan_b2b.py
@@ -1,9 +1,9 @@
import pytest
-@pytest.mark.skip(
- reason="Fix - intermittent failure in CI, need to debug"
-)
+# @pytest.mark.skip(
+# reason="Fix - intermittent failure in CI, need to debug"
+# )
def test_vxlan_b2b(api, utils):
config = api.config()
diff --git a/tests/vxlan/test_vxlan_b2b_v6.py b/tests/vxlan/test_vxlan_b2b_v6.py
new file mode 100644
index 000000000..798598def
--- /dev/null
+++ b/tests/vxlan/test_vxlan_b2b_v6.py
@@ -0,0 +1,158 @@
+import pytest
+
+def test_vxlan_b2b_v6(api, utils):
+ config = api.config()
+
+ p1, p2 = config.ports.port(
+ name="tx", location=utils.settings.ports[0]
+ ).port(name="rx", location=utils.settings.ports[1])
+
+ d1, d2 = config.devices.device(name="d1").device(name="d2")
+
+ e1, e2 = d1.ethernets.add(), d2.ethernets.add()
+ e1.connection.port_name, e2.connection.port_name = p1.name, p2.name
+ e1.name, e2.name = "e1", "e2"
+ e1.mac, e2.mac = "00:01:00:00:00:01", "00:01:00:00:00:02"
+
+ ip1, ip2 = e1.ipv6_addresses.add(), e2.ipv6_addresses.add()
+ ip1.name, ip2.name = "ip_d1", "ip_d2"
+
+ ip1.address, ip2.address = "2001::1", "2001::2"
+ ip1.gateway, ip2.gateway = "2001::2", "2001::1"
+
+ bgp1, bgp2 = d1.bgp, d2.bgp
+ bgp1.router_id, bgp2.router_id = "10.10.10.1", "10.10.10.2"
+ bgp1_ipv6 = bgp1.ipv6_interfaces.add()
+ bgp2_ipv6 = bgp2.ipv6_interfaces.add()
+
+ bgp1_ipv6.ipv6_name, bgp2_ipv6.ipv6_name = ip1.name, ip2.name
+ bgp1_peer, bgp2_peer = bgp1_ipv6.peers.add(), bgp2_ipv6.peers.add()
+ bgp1_peer.name, bgp2_peer.name = "bgp_router1", "bgp_router2"
+
+ bgp1_peer.peer_address, bgp2_peer.peer_address = "2001::2", "2001::1"
+ bgp1_peer.as_type, bgp2_peer.as_type = "ebgp", "ebgp"
+ bgp1_peer.as_number, bgp2_peer.as_number = 100, 200
+
+ # Create & advertise loopback under bgp in d1 & d2
+ d1_l1 = d1.ipv6_loopbacks.add()
+ d1_l1.name = "d1_loopback1"
+ d1_l1.eth_name = "e1"
+ d1_l1.address = "3000::1"
+
+ bgp1_l1 = bgp1_peer.v6_routes.add(name="bgp_l1")
+ bgp1_l1.addresses.add(address="3000::1", prefix=32)
+
+ d2_l1 = d2.ipv6_loopbacks.add()
+ d2_l1.name = "d2_loopback1"
+ d2_l1.eth_name = "e2"
+ d2_l1.address = "4000::2"
+
+ bgp2_l1 = bgp2_peer.v6_routes.add(name="bgp2_l1")
+ bgp2_l1.addresses.add(address="4000::2", prefix=32)
+
+ # Create vxlan on d1
+ d1_vxlan = d1.vxlan.v6_tunnels.add()
+
+ d1_vxlan.vni = 1000
+ d1_vxlan.source_interface = d1_l1.name
+ d1_vxlan.name = "d1_vxlan"
+
+ # unicast communication
+ vtep = d1_vxlan.destination_ip_mode.unicast.vteps.add()
+ vtep.remote_vtep_address = "4000::2"
+ vtep.arp_suppression_cache.add("00:16:01:00:00:01", "5001::2")
+
+ # Create vxlan on d2
+ d2_vxlan = d2.vxlan.v6_tunnels.add()
+
+ d2_vxlan.vni = 1000
+ d2_vxlan.source_interface = d2_l1.name
+ d2_vxlan.name = "d2_vxlan"
+
+ # unicast communication
+ vtep = d2_vxlan.destination_ip_mode.unicast.vteps.add()
+ vtep.remote_vtep_address = "3000::1"
+ vtep.arp_suppression_cache.add("00:18:01:00:00:01", "5001::1")
+
+ # create two edge devices to communicate over vxlan
+ edge_d1 = config.devices.device(name="edge_d1")[-1]
+ edge_d2 = config.devices.device(name="edge_d2")[-1]
+
+ edge_e1 = edge_d1.ethernets.ethernet()[-1]
+ edge_e2 = edge_d2.ethernets.ethernet()[-1]
+
+ edge_e1.connection.vxlan_name = d1_vxlan.name
+ edge_e2.connection.vxlan_name = d2_vxlan.name
+
+ # edge_e1.connection.port_name = p1.name
+ # edge_e2.connection.port_name = p2.name
+
+ edge_e1.name, edge_e2.name = "edge_e1", "edge_e2"
+ edge_e1.mac, edge_e2.mac = "00:18:01:00:00:01", "00:16:01:00:00:01"
+
+ edge_ip1 = edge_e1.ipv6_addresses.add()
+ edge_ip2 = edge_e2.ipv6_addresses.add()
+
+ edge_ip1.name, edge_ip2.name = "edge_ip_d1", "edge_ip_d2"
+
+ edge_ip1.address, edge_ip2.address = "5001::1", "5001::2"
+ edge_ip1.gateway, edge_ip2.gateway = "5001::2", "5001::1"
+
+ edge_bgp1, edge_bgp2 = edge_d1.bgp, edge_d2.bgp
+ edge_bgp1.router_id, edge_bgp2.router_id = "100.1.1.1", "100.1.1.2"
+
+ edge_bgp1_ipv6 = edge_bgp1.ipv6_interfaces.add()
+ edge_bgp2_ipv6 = edge_bgp2.ipv6_interfaces.add()
+
+ edge_bgp1_ipv6.ipv6_name = edge_ip1.name
+ edge_bgp2_ipv6.ipv6_name = edge_ip2.name
+
+ edge_bgp1_peer = edge_bgp1_ipv6.peers.add()
+ edge_bgp2_peer = edge_bgp2_ipv6.peers.add()
+
+ edge_bgp1_peer.name, edge_bgp2_peer.name = "edge_bgp1", "edge_bgp2"
+
+ edge_bgp1_peer.peer_address = "5001::2"
+ edge_bgp2_peer.peer_address = "5001::1"
+
+ edge_bgp1_peer.as_type, edge_bgp2_peer.as_type = "ibgp", "ibgp"
+ edge_bgp1_peer.as_number, edge_bgp2_peer.as_number = 1000, 1000
+
+ edge_bgp1_rr = edge_bgp1_peer.v6_routes.add(name="A1")
+ edge_bgp1_rr.addresses.add(address="1000::1", prefix=32)
+
+ edge_bgp2_rr = edge_bgp2_peer.v6_routes.add(name="A2")
+ edge_bgp2_rr.addresses.add(address="1001::1", prefix=32)
+
+ flow = config.flows.flow(name="f1")[-1]
+ flow.tx_rx.device.tx_names = [edge_bgp1_rr.name]
+ flow.tx_rx.device.rx_names = [edge_bgp2_rr.name]
+
+ flow.duration.fixed_packets.packets = 100
+
+ flow.metrics.enable = True
+ flow.metrics.loss = True
+
+ utils.start_traffic(api, config, start_capture=False)
+
+ utils.wait_for(
+ lambda: results_ok(api, ["f1"], 100),
+ "stats to be as expected",
+ timeout_seconds=10,
+ )
+ utils.stop_traffic(api, config)
+
+
+def results_ok(api, flow_names, expected):
+ """
+ Returns True if there is no traffic loss else False
+ """
+ request = api.metrics_request()
+ request.flow.flow_names = flow_names
+ flow_results = api.get_metrics(request).flow_metrics
+ flow_rx = sum([f.frames_rx for f in flow_results])
+ return flow_rx == expected
+
+
+if __name__ == "__main__":
+ pytest.main(["-s", __file__])