Skip to content

Commit

Permalink
[feature](docker suite) make docker suite no need external doris clus…
Browse files Browse the repository at this point in the history
…ter (apache#40787)

1.  Make docker suite no need external doris cluster:
a. run docker suite according to ClusterOptions.cloudMode = true/false;
b. if ClusterOptions.cloudMode = null, user can set it with cmd: `sh
run-regression-test.sh --run docker_action -runMode=cloud/not_cloud`

2. refactor database.py, simplifier its logic;

3.  doris compose add cmd options -v for debug log.

4.  doris compose print flush buffer.
  • Loading branch information
yujun777 authored and dataroaring committed Oct 10, 2024
1 parent b9ab5fd commit 57751d2
Show file tree
Hide file tree
Showing 21 changed files with 229 additions and 137 deletions.
2 changes: 1 addition & 1 deletion docker/runtime/doris-compose/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list
RUN apt-get clean

RUN apt-get update && \
apt-get install -y default-mysql-client python lsof tzdata curl unzip patchelf jq procps && \
apt-get install -y default-mysql-client python lsof tzdata curl unzip patchelf jq procps util-linux && \
ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
dpkg-reconfigure -f noninteractive tzdata && \
apt-get clean
Expand Down
19 changes: 19 additions & 0 deletions docker/runtime/doris-compose/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,22 @@ steps:
2. Generate regression-conf-custom.groovy: `python docker/runtime/doris-compose/doris-compose.py config my-cluster <doris-root-path> --connect-follow-fe`
3. Run regression test: `bash run-regression-test.sh --run -times 1 -parallel 1 -suiteParallel 1 -d cloud/multi_cluster`

## Problem investigation

#### Log

Each cluster has logs in /tmp/doris/{cluster-name}/{node-xxx}/log. For each node, doris compose will also print log in /tmp/doris/{cluster-name}/{node-xxx}/log/health.out

#### Up cluster using non-detach mode

```
python docker/runtime/doris-compose/doris-compose.py up ... -no-detach
```

## Developer

Before submitting code, pls format code.

```
bash format-code.sh
```
6 changes: 3 additions & 3 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.

import filelock
import json
import jsonpickle
import os
import os.path
Expand Down Expand Up @@ -405,11 +404,12 @@ def get_add_init_config(self):
if self.cluster.is_cloud:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()), "",
self.cluster.get_meta_server_addr()),
"",
"# For regression-test",
"ignore_unsupported_properties_in_cloud_mode = true",
"merge_on_write_forced_to_false = true",
"deploy_mode = cloud"
"deploy_mode = cloud",
]

if self.cluster.sql_mode_node_mgr:
Expand Down
33 changes: 15 additions & 18 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ def add_parser(self, args_parsers):
def run(self, args):
raise Exception("No implemented")

def _add_parser_output_json(self, parser):
def _add_parser_common_args(self, parser):
parser.add_argument("-v",
"--verbose",
default=False,
action=self._get_parser_bool_action(True),
help="verbose logging.")
parser.add_argument("--output-json",
default=False,
action=self._get_parser_bool_action(True),
Expand Down Expand Up @@ -150,7 +155,7 @@ def add_parser(self, args_parsers):
parser = args_parsers.add_parser(self.command, help=help)
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_ids_args(parser)
self._add_parser_output_json(parser)
self._add_parser_common_args(parser)

def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
Expand Down Expand Up @@ -180,6 +185,7 @@ def add_parser(self, args_parsers):
nargs="?",
help="Specify docker image.")

self._add_parser_common_args(parser)
parser.add_argument(
"--cloud",
default=False,
Expand All @@ -197,8 +203,6 @@ def add_parser(self, args_parsers):
"> 0 max wait seconds, -1 wait unlimited."
)

self._add_parser_output_json(parser)

group1 = parser.add_argument_group("add new nodes",
"add cluster nodes.")
group1.add_argument(
Expand Down Expand Up @@ -325,16 +329,14 @@ def add_parser(self, args_parsers):
"--be-cluster-id",
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE cluster ID in conf. Default is False.")
help="Do not set BE cluster ID in conf. Default is False.")
else:
parser.add_argument(
"--no-be-cluster-id",
dest='be_cluster_id',
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE cluser ID in conf. Default is False.")
help="Do not set BE cluser ID in conf. Default is False.")

parser.add_argument(
"--fdb-version",
Expand Down Expand Up @@ -669,7 +671,7 @@ def add_parser(self, args_parsers):
"then apply to all containers.")
parser.add_argument("NAME", help="Specify cluster name")
self._add_parser_ids_args(parser)
self._add_parser_output_json(parser)
self._add_parser_common_args(parser)
parser.add_argument(
"--clean",
default=False,
Expand Down Expand Up @@ -782,12 +784,9 @@ def __init__(self):
self.created = ""
self.alive = ""
self.is_master = ""
self.query_port = ""
self.tablet_num = ""
self.last_heartbeat = ""
self.err_msg = ""
self.edit_log_port = 0
self.heartbeat_port = 0

def info(self, detail):
result = [
Expand Down Expand Up @@ -825,10 +824,8 @@ def update_db_info(self, db_mgr):
if fe:
self.alive = str(fe.alive).lower()
self.is_master = str(fe.is_master).lower()
self.query_port = fe.query_port
self.last_heartbeat = fe.last_heartbeat
self.err_msg = fe.err_msg
self.edit_log_port = fe.edit_log_port
elif self.node_type == CLUSTER.Node.TYPE_BE:
self.backend_id = -1
be = db_mgr.get_be(self.id)
Expand All @@ -838,7 +835,6 @@ def update_db_info(self, db_mgr):
self.tablet_num = be.tablet_num
self.last_heartbeat = be.last_heartbeat
self.err_msg = be.err_msg
self.heartbeat_port = be.heartbeat_port


class GenConfCommand(Command):
Expand Down Expand Up @@ -977,7 +973,7 @@ def add_parser(self, args_parsers):
help=
"Specify multiple clusters, if specific, show all their containers."
)
self._add_parser_output_json(parser)
self._add_parser_common_args(parser)
parser.add_argument("--detail",
default=False,
action=self._get_parser_bool_action(True),
Expand Down Expand Up @@ -1021,7 +1017,8 @@ def parse_cluster_compose_file(cluster_name):
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
service: ComposeService(
service:
ComposeService(
service,
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
Expand Down Expand Up @@ -1186,7 +1183,7 @@ def add_parser(self, args_parsers):
help=
"Specify multiple clusters, if specific, show all their containers."
)
self._add_parser_output_json(parser)
self._add_parser_common_args(parser)

def _handle_data(self, header, datas):
if utils.is_enable_log():
Expand Down
129 changes: 55 additions & 74 deletions docker/runtime/doris-compose/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,43 @@

class FEState(object):

def __init__(self, id, query_port, is_master, alive, last_heartbeat,
err_msg, edit_log_port):
def __init__(self, id, is_master, alive, last_heartbeat, err_msg):
self.id = id
self.query_port = query_port
self.is_master = is_master
self.alive = alive
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
self.edit_log_port = edit_log_port


class BEState(object):

def __init__(self, id, backend_id, decommissioned, alive, tablet_num,
last_heartbeat, err_msg, heartbeat_port):
last_heartbeat, err_msg):
self.id = id
self.backend_id = backend_id
self.decommissioned = decommissioned
self.alive = alive
self.tablet_num = tablet_num
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
self.heartbeat_port = heartbeat_port


class DBManager(object):

def __init__(self):
self.fe_states = {}
self.be_states = {}
self.query_port = -1
self.conn = None

def set_query_port(self, query_port):
self.query_port = query_port
self.master_fe_ip = ""

def get_fe(self, id):
return self.fe_states.get(id, None)

def get_be(self, id):
return self.be_states.get(id, None)

def load_states(self, query_ports):
self._load_fe_states(query_ports)
def load_states(self):
self._load_fe_states()
self._load_be_states()

def add_fe(self, fe_endpoint):
Expand Down Expand Up @@ -189,108 +182,96 @@ def create_default_storage_vault(self, cloud_store_config):
LOG.error(f"Failed to create default storage vault: {str(e)}")
raise

def _load_fe_states(self, query_ports):
def _load_fe_states(self):
fe_states = {}
alive_master_fe_port = None
for record in self._exec_query('''
show frontends '''):
# Unpack the record into individual columns
name, ip, edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record
is_master = utils.is_true(is_master)
alive = utils.is_true(alive)
alive_master_fe_ip = None
for record in self._exec_query("show frontends"):
name = record["Name"]
ip = record["Host"]
role = record["Role"]
is_master = utils.is_true(record["IsMaster"])
alive = utils.is_true(record["Alive"])
id = CLUSTER.Node.get_id_from_ip(ip)
query_port = query_ports.get(id, "")
last_heartbeat = utils.escape_null(last_heartbeat)
fe = FEState(id, query_port, is_master, alive, last_heartbeat,
err_msg, edit_log_port)
last_heartbeat = utils.escape_null(record["LastHeartbeat"])
err_msg = record["ErrMsg"]
fe = FEState(id, is_master, alive, last_heartbeat, err_msg)
fe_states[id] = fe
if is_master and alive and query_port:
alive_master_fe_port = query_port
LOG.info(
if is_master and alive:
alive_master_fe_ip = ip
LOG.debug(
"record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}"
.format(name, ip, alive, is_master, role))

self.fe_states = fe_states
if alive_master_fe_port and alive_master_fe_port != self.query_port:
self.query_port = alive_master_fe_port
if alive_master_fe_ip and alive_master_fe_ip != self.master_fe_ip:
self.master_fe_ip = alive_master_fe_ip
self._reset_conn()

def _load_be_states(self):
be_states = {}
for record in self._exec_query('''
select BackendId, Host, LastHeartbeat, Alive, SystemDecommissioned, TabletNum, ErrMsg, HeartbeatPort
from backends()'''):
backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, err_msg, heartbeat_port = record
backend_id = int(backend_id)
alive = utils.is_true(alive)
decommissioned = utils.is_true(decommissioned)
tablet_num = int(tablet_num)
id = CLUSTER.Node.get_id_from_ip(ip)
last_heartbeat = utils.escape_null(last_heartbeat)
heartbeat_port = utils.escape_null(heartbeat_port)
for record in self._exec_query("show backends"):
backend_id = int(record["BackendId"])
alive = utils.is_true(record["Alive"])
decommissioned = utils.is_true(record["SystemDecommissioned"])
tablet_num = int(record["TabletNum"])
id = CLUSTER.Node.get_id_from_ip(record["Host"])
last_heartbeat = utils.escape_null(record["LastHeartbeat"])
err_msg = record["ErrMsg"]
be = BEState(id, backend_id, decommissioned, alive, tablet_num,
last_heartbeat, err_msg, heartbeat_port)
last_heartbeat, err_msg)
be_states[id] = be
self.be_states = be_states

# return rows, and each row is a record map
def _exec_query(self, sql):
self._prepare_conn()
with self.conn.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()
fields = [field_md[0] for field_md in cursor.description
] if cursor.description else []
return [dict(zip(fields, row)) for row in cursor.fetchall()]

def _prepare_conn(self):
if self.conn:
return
if self.query_port <= 0:
raise Exception("Not set query_port")
self._reset_conn()

def _reset_conn(self):
self.conn = pymysql.connect(user="root",
host="127.0.0.1",
host=self.master_fe_ip,
read_timeout=10,
port=self.query_port)
connect_timeout=3,
port=CLUSTER.FE_QUERY_PORT)


def get_db_mgr(cluster_name, required_load_succ=True):
assert cluster_name
db_mgr = DBManager()
containers = utils.get_doris_containers(cluster_name).get(
cluster_name, None)
if not containers:
master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
"master_fe_ip")
master_fe_ip = None
if os.path.exists(master_fe_ip_file):
with open(master_fe_ip_file, "r") as f:
master_fe_ip = f.read().strip()

if not master_fe_ip:
return db_mgr
alive_fe_ports = {}

has_alive_fe = False
containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
for container in containers:
if utils.is_container_running(container):
_, node_type, id = utils.parse_service_name(container.name)
_, node_type, _ = utils.parse_service_name(container.name)
if node_type == CLUSTER.Node.TYPE_FE:
query_port = utils.get_map_ports(container).get(
CLUSTER.FE_QUERY_PORT, None)
if query_port:
alive_fe_ports[id] = query_port
if not alive_fe_ports:
has_alive_fe = True
break

if not has_alive_fe:
return db_mgr

master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
"master_fe_ip")
query_port = None
if os.path.exists(master_fe_ip_file):
with open(master_fe_ip_file, "r") as f:
master_fe_ip = f.read()
if master_fe_ip:
master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
query_port = alive_fe_ports.get(master_id, None)
if not query_port:
# A new cluster's master is fe-1
if 1 in alive_fe_ports:
query_port = alive_fe_ports[1]
else:
query_port = list(alive_fe_ports.values())[0]

db_mgr.set_query_port(query_port)
db_mgr.master_fe_ip = master_fe_ip
try:
db_mgr.load_states(alive_fe_ports)
db_mgr.load_states()
except Exception as e:
if required_load_succ:
raise e
Expand Down
Loading

0 comments on commit 57751d2

Please sign in to comment.