Skip to content

Commit

Permalink
Merge branch 'main' into sp-perf-opl
Browse files Browse the repository at this point in the history
  • Loading branch information
sarathbrp authored Jan 12, 2024
2 parents c24965d + 6fd9e1f commit 9b5f208
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 126 deletions.
57 changes: 57 additions & 0 deletions core/opl/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ def add_kafka_opts(parser):
default=int(os.getenv("KAFKA_PORT", 9092)),
help="Kafka port (also use env variable KAFKA_PORT)",
)
parser.add_argument(
"--kafka-acks",
default=os.getenv("KAFKA_ACKS", "all"),
help="How many acknowledgments the producer requires, either all, 1 or 0 (also use env variable KAFKA_ACKS)",
)
parser.add_argument(
"--kafka-timeout",
type=int,
Expand All @@ -109,6 +114,58 @@ def add_kafka_opts(parser):
default=os.getenv("KAFKA_GROUP", f"perf-test-{socket.gethostname()}"),
help="Kafka consumer group (also use env variable KAFKA_GROUP)",
)
parser.add_argument(
"--kafka-username",
default=os.getenv("KAFKA_USERNAME", ""),
help="Kafka username when logging into SASL cluster like MSK (also use env variable KAFKA_USERNAME)",
)
parser.add_argument(
"--kafka-password",
default=os.getenv("KAFKA_PASSWORD", ""),
help="Kafka password when logging into SASL cluster like MSK (also use env variable KAFKA_PASSWORD)",
)
parser.add_argument(
"--kafka-request-timeout-ms",
type=int,
default=int(os.getenv("KAFKA_REQUEST_TIMEOUT_MS", 30000)),
help="The client is going to wait this much time for the server to respond to a request (also use env variable KAFKA_REQUEST_TIMEOUT_MS)",
)
parser.add_argument(
"--kafka-max_block_ms",
type=int,
default=int(os.getenv("KAFKA_MAX_BLOCK_MS", 60000)),
help="Max time to block send e.g. because buffer is full (also use env variable KAFKA_MAX_BLOCK_MS)",
)
parser.add_argument(
"--kafka-linger-ms",
type=int,
default=int(os.getenv("KAFKA_LINGER_MS", 0)),
help="Max time to wait for more messages when creating batch (also use env variable KAFKA_LINGER_MS)",
)
parser.add_argument(
"--kafka-compression-type",
choices=[None, "gzip", "snappy", "lz4"],
default=os.getenv("KAFKA_COMPRESSION_TYPE", None),
help="The compression type for all data generated by the producer (also use env variable KAFKA_COMPRESSION_TYPE)",
)
parser.add_argument(
"--kafka-batch-size",
type=int,
default=int(os.getenv("KAFKA_BATCH_SIZE", 16384)),
help="Max size of the batch before sending (also use env variable KAFKA_BATCH_SIZE)",
)
parser.add_argument(
"--kafka-buffer-memory",
type=int,
default=int(os.getenv("KAFKA_BUFFER_MEMORY", 33554432)),
help="Memory the producer can use at max for batching (also use env variable KAFKA_BUFFER_MEMORY)",
)
parser.add_argument(
"--kafka-retries",
type=int,
default=int(os.getenv("KAFKA_RETRIES", 0)),
help="Resend any record whose send fails this many times. Can cause duplicates! (also use env variable KAFKA_RETRIES)",
)


def add_mosquitto_opts(parser):
Expand Down
6 changes: 3 additions & 3 deletions core/opl/cluster_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import jinja2
import jinja2.exceptions
import boto3

from requests.packages.urllib3.exceptions import InsecureRequestWarning
import urllib3

from . import data
from . import date
Expand Down Expand Up @@ -115,7 +114,7 @@ def measure(self, ri, name, monitoring_query, monitoring_step):
"start": ri.start.timestamp(),
"end": ri.end.timestamp(),
}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
response = requests.get(
url, headers=headers, params=params, verify=False, timeout=60
)
Expand Down Expand Up @@ -200,6 +199,7 @@ def measure(self, ri, name, grafana_target):
}
url = f"{self.args.grafana_host}:{self.args.grafana_port}/api/datasources/proxy/{self.args.grafana_datasource}/render"

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
r = requests.post(
url=url, headers=headers, params=params, timeout=60, verify=False
)
Expand Down
18 changes: 16 additions & 2 deletions core/opl/investigator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,22 @@ Results investigator
====================

This tool (see `../pass_or_fail.py` in one level up directory) is supposed
to check historical results of some given test, compare with new result
and decide if new test result is PASS or FAIL.
to load historical results of given test, compare it with new result for
the same test and decide if new test result is PASS or FAIL.

You can configure multiple things as of now:

1. How to get historical results of the test (supports ElasticSearch, CSV
and directory of JSON files)
3. How to load new result (support just JSON file)
4. What method to use to actually find if new result is out of safe bounds
(we mostly use `if new result is biggeer than max or smaller than min
of historical data, it is FAIL`, but it is easy to implement more)
6. What metrics from the JSONs to compare (e.g. `results.rps`,
`monitoring.pod.cpu.mean` and `monitoring.pod.memory.mean`)
8. Optionally you can also configure where to store metadata about
decision the script done. This is useful to keep track about trends
(supports ElasticSearch and CSV)

See `sample_config.yaml` for example configuration. This is what each
section is for:
Expand Down
6 changes: 4 additions & 2 deletions core/opl/skelet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ def setup_logger(app_name, stderr_log_lvl):

# Silence loggers of some chatty libraries we use
urllib_logger = logging.getLogger("urllib3.connectionpool")
urllib_logger.setLevel(stderr_log_lvl)
urllib_logger.setLevel(logging.WARNING)
selenium_logger = logging.getLogger("selenium.webdriver.remote.remote_connection")
selenium_logger.setLevel(stderr_log_lvl)
selenium_logger.setLevel(logging.WARNING)
kafka_logger = logging.getLogger("kafka")
kafka_logger.setLevel(logging.WARNING)

# Add stderr handler, with provided level
console_handler = logging.StreamHandler()
Expand Down
57 changes: 57 additions & 0 deletions opl/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ def add_kafka_opts(parser):
default=int(os.getenv("KAFKA_PORT", 9092)),
help="Kafka port (also use env variable KAFKA_PORT)",
)
parser.add_argument(
"--kafka-acks",
default=os.getenv("KAFKA_ACKS", "all"),
help="How many acknowledgments the producer requires, either all, 1 or 0 (also use env variable KAFKA_ACKS)",
)
parser.add_argument(
"--kafka-timeout",
type=int,
Expand All @@ -109,6 +114,58 @@ def add_kafka_opts(parser):
default=os.getenv("KAFKA_GROUP", f"perf-test-{socket.gethostname()}"),
help="Kafka consumer group (also use env variable KAFKA_GROUP)",
)
parser.add_argument(
"--kafka-username",
default=os.getenv("KAFKA_USERNAME", ""),
help="Kafka username when logging into SASL cluster like MSK (also use env variable KAFKA_USERNAME)",
)
parser.add_argument(
"--kafka-password",
default=os.getenv("KAFKA_PASSWORD", ""),
help="Kafka password when logging into SASL cluster like MSK (also use env variable KAFKA_PASSWORD)",
)
parser.add_argument(
"--kafka-request-timeout-ms",
type=int,
default=int(os.getenv("KAFKA_REQUEST_TIMEOUT_MS", 30000)),
help="The client is going to wait this much time for the server to respond to a request (also use env variable KAFKA_REQUEST_TIMEOUT_MS)",
)
parser.add_argument(
"--kafka-max_block_ms",
type=int,
default=int(os.getenv("KAFKA_MAX_BLOCK_MS", 60000)),
help="Max time to block send e.g. because buffer is full (also use env variable KAFKA_MAX_BLOCK_MS)",
)
parser.add_argument(
"--kafka-linger-ms",
type=int,
default=int(os.getenv("KAFKA_LINGER_MS", 0)),
help="Max time to wait for more messages when creating batch (also use env variable KAFKA_LINGER_MS)",
)
parser.add_argument(
"--kafka-compression-type",
choices=[None, "gzip", "snappy", "lz4"],
default=os.getenv("KAFKA_COMPRESSION_TYPE", None),
help="The compression type for all data generated by the producer (also use env variable KAFKA_COMPRESSION_TYPE)",
)
parser.add_argument(
"--kafka-batch-size",
type=int,
default=int(os.getenv("KAFKA_BATCH_SIZE", 16384)),
help="Max size of the batch before sending (also use env variable KAFKA_BATCH_SIZE)",
)
parser.add_argument(
"--kafka-buffer-memory",
type=int,
default=int(os.getenv("KAFKA_BUFFER_MEMORY", 33554432)),
help="Memory the producer can use at max for batching (also use env variable KAFKA_BUFFER_MEMORY)",
)
parser.add_argument(
"--kafka-retries",
type=int,
default=int(os.getenv("KAFKA_RETRIES", 0)),
help="Resend any record whose send fails this many times. Can cause duplicates! (also use env variable KAFKA_RETRIES)",
)


def add_mosquitto_opts(parser):
Expand Down
6 changes: 3 additions & 3 deletions opl/cluster_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import jinja2
import jinja2.exceptions
import boto3

from requests.packages.urllib3.exceptions import InsecureRequestWarning
import urllib3

from . import data
from . import date
Expand Down Expand Up @@ -115,7 +114,7 @@ def measure(self, ri, name, monitoring_query, monitoring_step):
"start": ri.start.timestamp(),
"end": ri.end.timestamp(),
}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
response = requests.get(
url, headers=headers, params=params, verify=False, timeout=60
)
Expand Down Expand Up @@ -200,6 +199,7 @@ def measure(self, ri, name, grafana_target):
}
url = f"{self.args.grafana_host}:{self.args.grafana_port}/api/datasources/proxy/{self.args.grafana_datasource}/render"

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
r = requests.post(
url=url, headers=headers, params=params, timeout=60, verify=False
)
Expand Down
46 changes: 34 additions & 12 deletions opl/consumer_lag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,48 @@ class ConsumerLag:
bootstrap_server and kafka group as input.
"""

def __init__(self, topic, bootstrap_servers, group) -> None:
def __init__(
self, topic, bootstrap_servers, group, username="", password=""
) -> None:
self.topic = topic
self.group = group
self.bootstrap_servers = bootstrap_servers
self.logger = logging.getLogger("consumer_lag")
self.offset_records = {}
self.username = username
self.password = password

def _getconsumer(self):
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
auto_offset_reset="latest",
enable_auto_commit=False,
max_poll_records=50,
max_poll_interval_ms=300000,
group_id=self.group,
session_timeout_ms=50000,
heartbeat_interval_ms=10000,
consumer_timeout_ms=100000,
)
# Common parameters for both cases
common_params = {
"bootstrap_servers": self.bootstrap_servers,
"auto_offset_reset": "latest",
"enable_auto_commit": False,
"max_poll_records": 50,
"max_poll_interval_ms": 300000,
"group_id": self.group,
"session_timeout_ms": 50000,
"heartbeat_interval_ms": 10000,
"consumer_timeout_ms": 100000,
}

# Kafka consumer creation: SASL or noauth
if self.username != "" and self.password != "":
logging.info(
f"Creating SASL password-protected Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.username,
"sasl_plain_password": self.password,
}
consumer = KafkaConsumer(**common_params, **sasl_params)
else:
logging.info(
f"Creating passwordless Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms"
)
consumer = KafkaConsumer(**common_params)
return consumer

def store_offset_records(self):
Expand Down
46 changes: 29 additions & 17 deletions opl/get_kafka_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def __init__(self, args, status_data, custom_methods):
}
self.connection = psycopg2.connect(**storage_db_conf)
self.status_data = status_data
self.kafka_hosts = [f"{args.kafka_host}:{args.kafka_port}"]
self.kafka_host = f"{args.kafka_host}:{args.kafka_port}"
self.kafka_group = args.kafka_group
self.kafka_topic = args.kafka_topic
self.kafka_timeout = args.kafka_timeout
self.kafka_max_poll_records = 100
self.kafka_username = args.kafka_username
self.kafka_password = args.kafka_password
self.queries_definition = yaml.load(
args.tables_definition, Loader=yaml.SafeLoader
)["queries"]
Expand Down Expand Up @@ -78,26 +80,36 @@ def kafka_ts2dt(self, timestamp):

def create_consumer(self):
# Store Kafka config to status data
self.status_data.set("parameters.kafka.bootstrap", self.kafka_hosts[0])
self.status_data.set("parameters.kafka.bootstrap", self.kafka_host)
self.status_data.set("parameters.kafka.group", self.kafka_group)
self.status_data.set("parameters.kafka.topic", self.kafka_topic)
self.status_data.set("parameters.kafka.timeout", self.kafka_timeout)

# Create Kafka consumer
consumer = KafkaConsumer(
self.kafka_topic,
bootstrap_servers=self.kafka_hosts,
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id=self.kafka_group,
max_poll_records=self.kafka_max_poll_records,
session_timeout_ms=50000,
heartbeat_interval_ms=10000,
consumer_timeout_ms=self.kafka_timeout,
)
logging.debug(
f"Created Kafka consumer for {self.kafka_hosts} for {self.kafka_topic} topic in group {self.kafka_group} with {self.kafka_timeout} ms timeout"
)
common_params = {
"bootstrap_servers": self.kafka_host,
"auto_offset_reset": "earliest",
"enable_auto_commit": True,
"group_id": self.kafka_group,
"max_poll_records": self.kafka_max_poll_records,
"session_timeout_ms": 50000,
"heartbeat_interval_ms": 10000,
"consumer_timeout_ms": self.kafka_timeout,
}

if self.kafka_username != "" and self.kafka_password != "":
logging.info(
f"Creating consumer with sasl username&pasword to {self.kafka_host}"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.kafka_username,
"sasl_plain_password": self.kafka_password,
}
consumer = KafkaConsumer([self.kafka_topic], **common_params, **sasl_params)
else:
logging.info(f"Creating passwordless consumer to {self.kafka_host}")
consumer = KafkaConsumer([self.kafka_topic], **common_params)
return consumer

def store_now(self):
Expand Down
Loading

0 comments on commit 9b5f208

Please sign in to comment.