diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 70a22970b13dcd..5df74ac2c3da45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -109,7 +109,7 @@ public synchronized void rollJournal() { long currentName = Long.parseLong(currentDbName); long newNameVerify = currentName + currentJournalDB.count(); if (newName == newNameVerify) { - LOG.info("roll edit log. new db name is {}", newName); + LOG.info("roll edit log. new dbName: {}, old dbName:{}", newName, currentDbName); currentJournalDB = bdbEnvironment.openDatabase(Long.toString(newName)); } else { String msg = String.format("roll journal error! journalId and db journal numbers is not match. " diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java new file mode 100644 index 00000000000000..98f1fc57f991bd --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.journal.bdbje; + +import org.apache.doris.catalog.Env; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; + +public class BDBEnvironmentTest { + private static final Logger LOG = LogManager.getLogger(BDBEnvironmentTest.class); + private static List tmpDirs = new ArrayList<>(); + + public static File createTmpDir() throws Exception { + String dorisHome = System.getenv("DORIS_HOME"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome)); + File dir = Files.createTempDirectory(Paths.get(dorisHome, "fe", "mocked"), "BDBEnvironmentTest").toFile(); + LOG.debug("createTmpDir path {}", dir.getAbsolutePath()); + tmpDirs.add(dir); + return dir; + } + + @AfterAll + public static void cleanUp() throws Exception { + for (File dir : tmpDirs) { + LOG.info("deleteTmpDir path {}", dir.getAbsolutePath()); + FileUtils.deleteDirectory(dir); + } + } + + private int findValidPort() { + int port = 0; + for (int i = 0; i < 65535; i++) { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + port = socket.getLocalPort(); + try (DatagramSocket datagramSocket = new DatagramSocket(port)) { + datagramSocket.setReuseAddress(true); + break; + } catch (SocketException e) { + LOG.info("The port {} is invalid and try another port", port); + } + } catch (IOException e) { + throw new IllegalStateException("Could not find a free TCP/IP port"); + } + } + Preconditions.checkArgument(((port > 0) && (port < 65536))); + return port; + } + + private byte[] randomBytes() { + byte[] byteArray = new byte[32]; + new SecureRandom().nextBytes(byteArray); + return byteArray; + } + + @Test + public void testSetup() throws Exception { + int port = findValidPort(); + String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false); + String selfNodeHostPort = "127.0.0.1:" + port; + LOG.debug("selfNodeName:{}, selfNodeHostPort:{}", selfNodeName, selfNodeHostPort); + + BDBEnvironment bdbEnvironment = new BDBEnvironment(); + bdbEnvironment.setup(createTmpDir(), selfNodeName, selfNodeHostPort, selfNodeHostPort, true); + + String dbName = "testEnvironment"; + Database db = bdbEnvironment.openDatabase(dbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + + Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key, value)); + + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + + // Remove database + bdbEnvironment.removeDatabase(dbName); + Exception exception = Assertions.assertThrows(IllegalStateException.class, () -> { + db.put(null, key, value); + }); + + String expectedMessage = "Database was closed."; + String actualMessage = exception.getMessage(); + LOG.debug("exception:", exception); + Assertions.assertTrue(actualMessage.contains(expectedMessage)); + + bdbEnvironment.close(); + exception = Assertions.assertThrows(IllegalStateException.class, () -> { + db.put(null, key, value); + }); + expectedMessage = "Environment is closed."; + actualMessage = exception.getMessage(); + LOG.debug("exception:", exception); + Assertions.assertTrue(actualMessage.contains(expectedMessage)); + } + + /** + * Test build a BDBEnvironment cluster (1 master + 2 follower + 1 observer) + * @throws Exception + */ + @Test + public void testCluster() throws Exception { + int masterPort = findValidPort(); + String masterNodeName = Env.genFeNodeName("127.0.0.1", masterPort, false); + String masterNodeHostPort = "127.0.0.1:" + masterPort; + LOG.debug("masterNodeName:{}, masterNodeHostPort:{}", masterNodeName, masterNodeHostPort); + + BDBEnvironment masterEnvironment = new BDBEnvironment(); + File masterDir = createTmpDir(); + masterEnvironment.setup(masterDir, masterNodeName, masterNodeHostPort, masterNodeHostPort, true); + + List followerEnvironments = new ArrayList<>(); + List followerDirs = new ArrayList<>(); + for (int i = 1; i <= 2; i++) { + int followerPort = findValidPort(); + String followerNodeName = Env.genFeNodeName("127.0.0.1", followerPort, false); + String followerNodeHostPort = "127.0.0.1:" + followerPort; + LOG.debug("followerNodeName{}:{}, followerNodeHostPort{}:{}", i, i, followerNodeName, followerNodeHostPort); + + BDBEnvironment followerEnvironment = new BDBEnvironment(); + File followerDir = createTmpDir(); + followerDirs.add(followerDir); + followerEnvironment.setup(followerDir, followerNodeName, followerNodeHostPort, masterNodeHostPort, true); + followerEnvironments.add(followerEnvironment); + } + + int observerPort = findValidPort(); + String observerNodeName = Env.genFeNodeName("127.0.0.1", observerPort, false); + String observerNodeHostPort = "127.0.0.1:" + observerPort; + LOG.debug("observerNodeName:{}, observerNodeHostPort:{}", observerNodeName, observerNodeHostPort); + + BDBEnvironment observerEnvironment = new BDBEnvironment(); + File observerDir = createTmpDir(); + observerEnvironment.setup(observerDir, observerNodeName, observerNodeHostPort, masterNodeHostPort, false); + + String dbName = "1234"; + Database masterDb = masterEnvironment.openDatabase(dbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); + + for (BDBEnvironment followerEnvironment : followerEnvironments) { + Assertions.assertEquals(1, followerEnvironment.getDatabaseNames().size()); + Database followerDb = followerEnvironment.openDatabase(dbName); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + } + + Assertions.assertEquals(1, observerEnvironment.getDatabaseNames().size()); + Database observerDb = observerEnvironment.openDatabase(dbName); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, observerDb.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + + observerEnvironment.close(); + followerEnvironments.stream().forEach(environment -> { + environment.close(); }); + masterEnvironment.close(); + + masterEnvironment.openReplicatedEnvironment(masterDir); + for (int i = 0; i < 2; i++) { + followerEnvironments.get(i).openReplicatedEnvironment(followerDirs.get(i)); + } + observerEnvironment.openReplicatedEnvironment(observerDir); + + observerEnvironment.close(); + followerEnvironments.stream().forEach(environment -> { + environment.close(); }); + masterEnvironment.close(); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java new file mode 100644 index 00000000000000..57386915ca2330 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.journal.bdbje; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Pair; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.journal.Journal; +import org.apache.doris.journal.JournalCursor; +import org.apache.doris.journal.JournalEntity; +import org.apache.doris.persist.OperationType; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import mockit.Mock; +import mockit.MockUp; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use uppercase + private static final Logger LOG = LogManager.getLogger(BDBJEJournalTest.class); + private static File tmpDir; + + @BeforeAll + public static void setUp() throws Exception { + String dorisHome = System.getenv("DORIS_HOME"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome)); + tmpDir = Files.createTempDirectory(Paths.get(dorisHome, "fe", "mocked"), "BDBJEJournalTest").toFile(); + LOG.debug("tmpDir path {}", tmpDir.getAbsolutePath()); + return; + } + + @AfterAll + public static void cleanUp() throws Exception { + FileUtils.deleteDirectory(tmpDir); + } + + private int findValidPort() { + int port = 0; + for (int i = 0; i < 65535; i++) { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + port = socket.getLocalPort(); + try (DatagramSocket datagramSocket = new DatagramSocket(port)) { + datagramSocket.setReuseAddress(true); + break; + } catch (SocketException e) { + LOG.info("The port {} is invalid and try another port", port); + } + } catch (IOException e) { + throw new IllegalStateException("Could not find a free TCP/IP port"); + } + } + return port; + } + + @Test + public void testNormal() throws Exception { + int port = findValidPort(); + Preconditions.checkArgument(((port > 0) && (port < 65535))); + String nodeName = Env.genFeNodeName("127.0.0.1", port, false); + long replayedJournalId = 0; + new MockUp() { + HostInfo selfNode = new HostInfo("127.0.0.1", port); + @Mock + public String getBdbDir() { + return tmpDir.getAbsolutePath(); + } + + @Mock + public HostInfo getSelfNode() { + return this.selfNode; + } + + @Mock + public HostInfo getHelperNode() { + return this.selfNode; + } + + @Mock + public boolean isElectable() { + return true; + } + + @Mock + public long getReplayedJournalId() { + return replayedJournalId; + } + }; + + LOG.info("BdbDir:{}, selfNode:{}, nodeName:{}", Env.getServingEnv().getBdbDir(), + Env.getServingEnv().getBdbDir(), nodeName); + Assertions.assertEquals(tmpDir.getAbsolutePath(), Env.getServingEnv().getBdbDir()); + Journal journal = new BDBJEJournal(nodeName); + journal.open(); + journal.rollJournal(); + for (int i = 0; i < 10; i++) { + String data = "OperationType.OP_TIMESTAMP"; + Writable writable = new Writable() { + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, data); + } + }; + journal.write(OperationType.OP_TIMESTAMP, writable); + } + + Assertions.assertEquals(10, journal.getMaxJournalId()); + Assertions.assertEquals(10, journal.getJournalNum()); + Assertions.assertEquals(1, journal.getMinJournalId()); + Assertions.assertEquals(0, journal.getFinalizedJournalId()); + + LOG.debug("journal.getDatabaseNames(): {}", journal.getDatabaseNames()); + Assertions.assertEquals(1, journal.getDatabaseNames().size()); + Assertions.assertEquals(1, journal.getDatabaseNames().get(0)); + + JournalEntity journalEntity = journal.read(1); + Assertions.assertEquals(OperationType.OP_TIMESTAMP, journalEntity.getOpCode()); + + for (int i = 10; i < 50; i++) { + if (i % 10 == 0) { + journal.rollJournal(); + } + String data = "OperationType.OP_TIMESTAMP"; + Writable writable = new Writable() { + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, data); + } + }; + journal.write(OperationType.OP_TIMESTAMP, writable); + } + + Assertions.assertEquals(50, journal.getMaxJournalId()); + Assertions.assertEquals(10, journal.getJournalNum()); + Assertions.assertEquals(1, journal.getMinJournalId()); + Assertions.assertEquals(40, journal.getFinalizedJournalId()); + + LOG.debug("journal.getDatabaseNames(): {}", journal.getDatabaseNames()); + Assertions.assertEquals(5, journal.getDatabaseNames().size()); + Assertions.assertEquals(41, journal.getDatabaseNames().get(4)); + + JournalCursor cursor = journal.read(1, 50); + Assertions.assertNotNull(cursor); + for (int i = 1; i < 50; i++) { + Pair kv = cursor.next(); + Assertions.assertNotNull(kv); + JournalEntity entity = kv.second; + Assertions.assertEquals(OperationType.OP_TIMESTAMP, entity.getOpCode()); + } + + journal.close(); + + journal.open(); + journal.deleteJournals(21); + LOG.info("journal.getDatabaseNames(): {}", journal.getDatabaseNames()); + Assertions.assertEquals(3, journal.getDatabaseNames().size()); + Assertions.assertEquals(21, journal.getDatabaseNames().get(0)); + journal.close(); + } +}