diff --git a/pom.xml b/pom.xml index c914070c958..17a54382af0 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ 1.7 1.7 2.7.3 - 1.5.0 + 1.6.1 4.11 2.11.8 2.11 @@ -98,6 +98,9 @@ smart-hadoop-support + + true + diff --git a/smart-alluxio-support/smart-alluxio/pom.xml b/smart-alluxio-support/smart-alluxio/pom.xml index c623bb24f83..68903a1c821 100644 --- a/smart-alluxio-support/smart-alluxio/pom.xml +++ b/smart-alluxio-support/smart-alluxio/pom.xml @@ -111,6 +111,11 @@ alluxio-core-client-fs ${alluxio.version} + + org.alluxio + alluxio-core-server-common + ${alluxio.version} + org.alluxio alluxio-minicluster @@ -157,6 +162,17 @@ smart-metastore 1.4.0-SNAPSHOT + + com.squareup + tape + 1.2.3 + + + org.mockito + mockito-all + 1.10.8 + test + @@ -179,4 +195,4 @@ - + \ No newline at end of file diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/alluxio/master/journal/ufs/AlluxioJournalUtil.java b/smart-alluxio-support/smart-alluxio/src/main/java/alluxio/master/journal/ufs/AlluxioJournalUtil.java new file mode 100644 index 00000000000..af47dab6709 --- /dev/null +++ b/smart-alluxio-support/smart-alluxio/src/main/java/alluxio/master/journal/ufs/AlluxioJournalUtil.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package alluxio.master.journal.ufs; + +import alluxio.AlluxioURI; +import alluxio.Configuration; +import alluxio.Constants; +import alluxio.PropertyKey; +import alluxio.master.NoopMaster; +import alluxio.master.journal.JournalReader; +import alluxio.proto.journal.Journal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + + +/** + * Util for reading the journal entries given a range of sequence numbers. + */ +public class AlluxioJournalUtil { + + public static final Logger LOG = LoggerFactory.getLogger(AlluxioJournalUtil.class); + + private static String sMaster = Constants.FILE_SYSTEM_MASTER_NAME; + + /** + * @param conf smart configuration + * @return the current entry sequence number + */ + public static Long getCurrentSeqNum(SmartConf conf) { + UfsJournal journal = + new UfsJournalSystem(getJournalLocation(conf), 0).createJournal(new NoopMaster(sMaster)); + UfsJournalFile currentLog; + try { + currentLog = UfsJournalSnapshot.getCurrentLog(journal); + } catch (IOException e) { + throw new RuntimeException(e); + } + long sn = -1L; + if (currentLog != null) { + try (JournalReader reader = new UfsJournalReader(journal, currentLog.getStart(), true)) { + Journal.JournalEntry entry; + while ((entry = reader.read()) != null) { + sn = entry.getSequenceNumber(); + if (sn >= Long.MAX_VALUE) { + break; + } + } + } catch (Exception e) { + LOG.error("Failed to read next journal entry.", e); + } + } + return sn; + } + + /** + * @param conf smart configuration + * @param startSn journal entry sequence number + * @return journal reader + */ + public static JournalReader getJournalReaderFromSn(SmartConf conf, Long startSn) { + UfsJournal journal = + new UfsJournalSystem(getJournalLocation(conf), 0).createJournal(new NoopMaster(sMaster)); + JournalReader reader = new UfsJournalReader(journal, startSn, true); + return reader; + } + + /** + * @param conf smart configuration + * @return the journal location + */ + private static URI getJournalLocation(SmartConf conf) { + String alluxioMasterJournalDir = conf.get( + SmartConfKeys.SMART_ALLUXIO_MASTER_JOURNAL_DIR_KEY, "/opt/alluxio/journal"); + Configuration.set(PropertyKey.MASTER_JOURNAL_FOLDER, alluxioMasterJournalDir); + String journalDirectory = Configuration.get(PropertyKey.MASTER_JOURNAL_FOLDER); + if (!journalDirectory.endsWith(AlluxioURI.SEPARATOR)) { + journalDirectory += AlluxioURI.SEPARATOR; + } + try { + return new URI(journalDirectory); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioStatesUpdateService.java b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioStatesUpdateService.java index 4a30c9e8fe5..ab88ed3707b 100644 --- a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioStatesUpdateService.java +++ b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioStatesUpdateService.java @@ -17,32 +17,42 @@ */ package org.smartdata.alluxio; +import alluxio.AlluxioURI; +import alluxio.client.WriteType; +import alluxio.client.file.FileOutStream; +import alluxio.client.file.FileSystem; +import alluxio.client.file.options.CreateFileOptions; +import alluxio.client.file.options.DeleteOptions; +import alluxio.exception.AlluxioException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.SmartContext; -import org.smartdata.alluxio.metric.fetcher.AlluxioNamespaceFetcher; +import org.smartdata.alluxio.metric.fetcher.AlluxioEntryFetcher; import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.StatesUpdateService; -import alluxio.client.file.FileSystem; - import java.io.IOException; +import java.net.InetAddress; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** * Polls metrics and events from Alluxio Server */ public class AlluxioStatesUpdateService extends StatesUpdateService { - + private static final String ALLUXIO_MOVER_ID_PATH = "/system/alluxio-mover.id"; + private volatile boolean inSafeMode; private FileSystem alluxioFs; private ScheduledExecutorService executorService; - private AlluxioNamespaceFetcher namespaceFetcher; + private AlluxioEntryFetcher alluxioEntryFetcher; + private FileOutStream moverIdOutStream; public static final Logger LOG = LoggerFactory.getLogger(AlluxioStatesUpdateService.class); public AlluxioStatesUpdateService(SmartContext context, MetaStore metaStore) { super(context, metaStore); + this.inSafeMode = true; } /** @@ -55,29 +65,69 @@ public void init() throws IOException { LOG.info("Initializing ..."); SmartContext context = getContext(); this.alluxioFs = AlluxioUtil.getAlluxioFs(context); + this.moverIdOutStream = checkAndMarkRunning(alluxioFs); this.executorService = Executors.newScheduledThreadPool(4); - this.namespaceFetcher = new AlluxioNamespaceFetcher(alluxioFs, metaStore, - AlluxioNamespaceFetcher.DEFAULT_INTERVAL, executorService); + this.alluxioEntryFetcher = new AlluxioEntryFetcher(alluxioFs, metaStore, + executorService, new EntryFetchFinishedCallBack(), context.getConf()); LOG.info("Initialized."); } + private class EntryFetchFinishedCallBack implements Callable { + @Override + public Object call() throws Exception { + inSafeMode = false; + return null; + } + } + + @Override + public boolean inSafeMode() { + return inSafeMode; + } + /** * Start daemon threads in StatesManager for function. */ @Override public void start() throws IOException { LOG.info("Starting ..."); - this.namespaceFetcher.startFetch(); + this.alluxioEntryFetcher.start(); LOG.info("Started. "); } @Override public void stop() throws IOException { LOG.info("Stopping ..."); - if (this.namespaceFetcher != null) { - this.namespaceFetcher.stop(); + if (moverIdOutStream != null) { + try { + moverIdOutStream.close(); + } catch (IOException e) { + LOG.debug("Close alluxio 'mover' ID output stream error", e); + } + } + if (alluxioEntryFetcher != null) { + alluxioEntryFetcher.stop(); } LOG.info("Stopped."); } + private FileOutStream checkAndMarkRunning(FileSystem fs) throws IOException { + AlluxioURI moverIdPath = new AlluxioURI(ALLUXIO_MOVER_ID_PATH); + try { + if (fs.exists(moverIdPath)) { + // Alluxio does not support append operation (ALLUXIO-25), here just delete it + fs.delete(moverIdPath, DeleteOptions.defaults().setRecursive(true)); + } + CreateFileOptions options = CreateFileOptions.defaults().setWriteType( + WriteType.MUST_CACHE); + FileOutStream fos = fs.createFile(moverIdPath, options); + fos.write(InetAddress.getLocalHost().getHostName().getBytes()); + fos.flush(); + return fos; + } catch (IOException | AlluxioException e) { + LOG.error("Unable to lock alluxio 'mover', please stop alluxio 'mover' first."); + throw new IOException(e.getMessage()); + } + } + } diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioUtil.java b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioUtil.java index 582238303b6..b88eecadee6 100644 --- a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioUtil.java +++ b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/AlluxioUtil.java @@ -19,6 +19,7 @@ import java.io.IOException; +import alluxio.client.file.URIStatus; import org.smartdata.SmartContext; import org.smartdata.conf.SmartConfKeys; import org.slf4j.Logger; @@ -28,6 +29,7 @@ import alluxio.PropertyKey; import alluxio.client.file.FileSystem; import alluxio.client.file.FileSystemContext; +import org.smartdata.model.FileInfo; /** * Contain utils related to alluxio cluster. @@ -44,6 +46,22 @@ public static FileSystem getAlluxioFs(SmartContext context) throws IOException { FileSystemContext fsContext = FileSystemContext.create(); return FileSystem.Factory.get(fsContext); } - + + public static FileInfo convertFileStatus(URIStatus status) { + FileInfo fileInfo = new FileInfo( + status.getPath(), + status.getFileId(), + status.getLength(), + status.isFolder(), + (short)1, + status.getBlockSizeBytes(), + status.getLastModificationTimeMs(), + status.getCreationTimeMs(), + (short) status.getMode(), + status.getOwner(), + status.getGroup(), + (byte) 0); + return fileInfo; + } } diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryApplier.java b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryApplier.java new file mode 100644 index 00000000000..ce9328124c7 --- /dev/null +++ b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryApplier.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.alluxio.metric.fetcher; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.ExceptionMessage; +import alluxio.proto.journal.File.*; +import alluxio.proto.journal.Journal.JournalEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.alluxio.AlluxioUtil; +import org.smartdata.metastore.MetaStore; +import org.smartdata.metastore.MetaStoreException; +import org.smartdata.model.FileDiff; +import org.smartdata.model.FileDiffType; +import org.smartdata.model.FileInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class AlluxioEntryApplier { + + private static final Logger LOG = LoggerFactory.getLogger(AlluxioEntryApplier.class); + + private final MetaStore metaStore; + private FileSystem fs; + + public AlluxioEntryApplier(MetaStore metaStore, FileSystem fs) { + this.metaStore = metaStore; + this.fs = fs; + } + + public void apply(JournalEntry entry) throws IOException, MetaStoreException { + List statements = new ArrayList<>(); + List sqlist = processEntryToSql(entry); + if (sqlist != null && !sqlist.isEmpty()){ + for (String sql : sqlist) { + if (sql != null && sql.length() > 0) { + statements.add(sql); + } + } + } + this.metaStore.execute(statements); + } + + private List processEntryToSql(JournalEntry entry) throws IOException, MetaStoreException { + if (entry.hasInodeDirectory()) { + LOG.trace("entry type:" + entry.getInodeDirectory().getClass() + + ", id:" + entry.getInodeDirectory().getId()); + InodeDirectoryEntry inodeDirectoryEntry = entry.getInodeDirectory(); + String inodeDir = getPathFromInodeDir(inodeDirectoryEntry); + URIStatus dStatus = null; + try { + dStatus = fs.getStatus(new AlluxioURI(inodeDir)); + } catch (AlluxioException e) { + e.printStackTrace(); + } + FileInfo fileInfo = AlluxioUtil.convertFileStatus(dStatus); + metaStore.insertFile(fileInfo); + return Collections.singletonList(""); + } else if (entry.hasInodeFile()) { + LOG.trace("entry type:" + entry.getInodeFile().getClass() + + ", id:" + entry.getInodeFile().getId()); + String addSql = addInodeFileFromEntry(entry.getInodeFile()); + return Collections.singletonList(addSql); + } else if (entry.hasInodeLastModificationTime()) { + LOG.trace("entry type:" + entry.getInodeLastModificationTime().getClass() + + ", id:" + entry.getInodeLastModificationTime().getId()); + InodeLastModificationTimeEntry modTimeEntry = entry.getInodeLastModificationTime(); + String path = getPathByFileId(modTimeEntry.getId()); + FileDiff fileDiff = null; + if (inBackup(path)) { + fileDiff = new FileDiff(FileDiffType.METADATA); + fileDiff.setSrc(path); + } + if (fileDiff != null) { + fileDiff.getParameters().put("-mtime", "" + modTimeEntry.getLastModificationTimeMs()); + metaStore.insertFileDiff(fileDiff); + } + String modifySql = String.format( + "UPDATE file SET modification_time = %s WHERE fid = '%s';", + modTimeEntry.getLastModificationTimeMs(), + modTimeEntry.getId()); + return Collections.singletonList(modifySql); + } else if (entry.hasPersistDirectory()) { + LOG.trace("entry type:" + entry.getPersistDirectory().getClass() + + ", id:" + entry.getPersistDirectory().getId()); + PersistDirectoryEntry typedEntry = entry.getPersistDirectory(); + LOG.debug("Persist directory id " + typedEntry.getId()); + return Collections.singletonList(""); + } else if (entry.hasSetAttribute()) { + LOG.trace("entry type:" + entry.getSetAttribute().getClass() + + ", id:" + entry.getSetAttribute().getId()); + String setAttrSql = setAttributeFromEntry(entry.getSetAttribute()); + return Collections.singletonList(setAttrSql); + } else if (entry.hasRename()) { + LOG.trace("entry type:" + entry.getRename().getClass() + + ", id:" + entry.getRename().getId()); + return renameFromEntry(entry.getRename()); + } else if (entry.hasDeleteFile()) { + LOG.trace("entry type:" + entry.getDeleteFile().getClass() + + ", id:" + entry.getDeleteFile().getId()); + String delSql = deleteFromEntry(entry.getDeleteFile()); + return Collections.singletonList(delSql); + } else if (entry.hasAddMountPoint()) { + LOG.trace("entry type:" + entry.getAddMountPoint().getClass() + + ", alluxio path:" + entry.getAddMountPoint().getAlluxioPath() + + ", ufs path:" + entry.getAddMountPoint().getUfsPath()); + return Collections.singletonList(mountFromEntry(entry.getAddMountPoint())); + } else if (entry.hasDeleteMountPoint()) { + LOG.trace("entry type:" + entry.getDeleteMountPoint().getClass() + + ", alluxio path:" + entry.getDeleteMountPoint().getAlluxioPath()); + return Collections.singletonList(unmountFromEntry(entry.getDeleteMountPoint())); + } else if (entry.hasAsyncPersistRequest() + || entry.hasCompleteFile() + || entry.hasInodeDirectoryIdGenerator() + || entry.hasReinitializeFile()) { + //Do nothing + } else { + throw new IOException(ExceptionMessage.UNEXPECTED_JOURNAL_ENTRY.getMessage(entry)); + } + return Collections.emptyList(); + } + + private String addInodeFileFromEntry(InodeFileEntry inodeFileEntry) throws MetaStoreException { + String inodePath = getPathFromInodeFile(inodeFileEntry); + URIStatus status = null; + try { + status = fs.getStatus(new AlluxioURI(inodePath)); + } catch (IOException | AlluxioException e) { + e.printStackTrace(); + } + FileInfo fileInfo = AlluxioUtil.convertFileStatus(status); + if (inBackup(fileInfo.getPath())) { + FileDiff fileDiff = new FileDiff(FileDiffType.APPEND); + fileDiff.setSrc(fileInfo.getPath()); + fileDiff.getParameters().put("-offset", String.valueOf(0)); + // Note that "-length 0" means create an empty file + fileDiff.getParameters() + .put("-length", String.valueOf(fileInfo.getLength())); + //add modification_time and access_time to filediff + fileDiff.getParameters().put("-mtime", "" + fileInfo.getModificationTime()); + //add owner to filediff + fileDiff.getParameters().put("-owner", "" + fileInfo.getOwner()); + fileDiff.getParameters().put("-group", "" + fileInfo.getGroup()); + //add Permission to filediff + fileDiff.getParameters().put("-permission", "" + fileInfo.getPermission()); + metaStore.insertFileDiff(fileDiff); + } + metaStore.insertFile(fileInfo); + return ""; + } + + private String setAttributeFromEntry(SetAttributeEntry setAttrEntry) throws MetaStoreException { + String path = getPathByFileId(setAttrEntry.getId()); + FileDiff fileDiff = null; + if (inBackup(path)) { + fileDiff = new FileDiff(FileDiffType.METADATA); + fileDiff.setSrc(path); + } + if (setAttrEntry.hasPinned()) { + LOG.debug(String.format("File %s is pinned %s", setAttrEntry.getId(), setAttrEntry.getPinned())); + //Todo + } else if (setAttrEntry.hasTtl()) { + LOG.debug(String.format("File %s has ttl %s with ttlAction %s", setAttrEntry.getId(), setAttrEntry.getTtl(), setAttrEntry.getTtlAction())); + //Todo + } else if (setAttrEntry.hasPersisted()) { + LOG.debug(String.format("File %s is persisted %s", setAttrEntry.getId(), setAttrEntry.getPersisted())); + //Todo + } else if (setAttrEntry.hasOwner()) { + if (fileDiff != null) { + fileDiff.getParameters().put("-owner", "" + setAttrEntry.getOwner()); + metaStore.insertFileDiff(fileDiff); + } + //Todo + } else if (setAttrEntry.hasGroup()) { + if (fileDiff != null) { + fileDiff.getParameters().put("-group", "" + setAttrEntry.getGroup()); + metaStore.insertFileDiff(fileDiff); + } + //Todo + } else if (setAttrEntry.hasPermission()) { + if (fileDiff != null) { + fileDiff.getParameters().put("-permission", "" + (short)setAttrEntry.getPermission()); + metaStore.insertFileDiff(fileDiff); + } + return String.format( + "UPDATE file SET permission = %s WHERE path = '%s';", + (short)setAttrEntry.getPermission(), path); + } + return ""; + } + + private List renameFromEntry(RenameEntry renameEntry) throws MetaStoreException { + List sqlist = new ArrayList<>(); + URIStatus dStatus = null; + try { + dStatus = fs.getStatus(new AlluxioURI(renameEntry.getDstPath())); + } catch (IOException | AlluxioException e) { + e.printStackTrace(); + } + if (dStatus == null) { + LOG.debug("Get rename dest status failed, {}", renameEntry.getDstPath()); + } + FileInfo fileInfo = metaStore.getFile(renameEntry.getId()); + if (fileInfo == null) { + if (dStatus != null) { + fileInfo = AlluxioUtil.convertFileStatus(dStatus); + metaStore.insertFile(fileInfo); + } + } else { + FileDiff fileDiff = new FileDiff(FileDiffType.RENAME); + String srcPath = fileInfo.getPath(); + if (inBackup(srcPath)) { + fileDiff.setSrc(srcPath); + fileDiff.getParameters().put("-dest", renameEntry.getDstPath()); + metaStore.insertFileDiff(fileDiff); + } + sqlist.add(String.format("UPDATE file SET path = replace(path, '%s', '%s') WHERE path = '%s';", + srcPath, renameEntry.getDstPath(), srcPath)); + if (fileInfo.isdir()) { + sqlist.add(String.format("UPDATE file SET path = replace(path, '%s', '%s') WHERE path LIKE '%s/%%';", + srcPath, renameEntry.getDstPath(), srcPath)); + } + } + + return sqlist; + } + + private String deleteFromEntry(DeleteFileEntry deleteFileEntry) throws MetaStoreException { + String path = getPathByFileId(deleteFileEntry.getId()); + if (inBackup(path)) { + FileDiff fileDiff = new FileDiff(FileDiffType.DELETE); + fileDiff.setSrc(path); + metaStore.insertFileDiff(fileDiff); + } + return String.format("DELETE FROM file WHERE fid =%s;", deleteFileEntry.getId()); + } + + private String mountFromEntry(AddMountPointEntry mountPointEntry) { + LOG.debug("Add mount alluxio path %s to ufs path %s", + mountPointEntry.getAlluxioPath(), mountPointEntry.getUfsPath()); + return ""; + } + + private String unmountFromEntry(DeleteMountPointEntry unmountEntry) { + LOG.debug("Delete mount alluxio path %s", unmountEntry.getAlluxioPath()); + return ""; + } + + private boolean inBackup(String src) throws MetaStoreException { + return metaStore.srcInbackup(src); + } + + public String getPathFromInodeFile(InodeFileEntry fileEntry) throws MetaStoreException { + long pid = fileEntry.getParentId(); + String fName = fileEntry.getName(); + FileInfo fileInfo = metaStore.getFile(pid); + String pPath = ""; + if (fileInfo != null) { + pPath = formatPath(fileInfo.getPath()); + } + return pPath.concat(fName); + } + + public String getPathFromInodeDir(InodeDirectoryEntry dirEntry) throws MetaStoreException { + long pid = dirEntry.getParentId(); + String dName = dirEntry.getName(); + FileInfo fileInfo = metaStore.getFile(pid); + String pDir = ""; + if (fileInfo != null) { + pDir = formatPath(fileInfo.getPath()); + } + return pDir.concat(dName); + } + + public String getPathByFileId(long fid) throws MetaStoreException { + FileInfo fileInfo = metaStore.getFile(fid); + String path = ""; + if (fileInfo != null) { + path = fileInfo.getPath(); + } + return path; + } + + private String formatPath(String path) { + if (!path.endsWith(AlluxioURI.SEPARATOR)) { + path += AlluxioURI.SEPARATOR; + } + return path; + } +} diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryFetchAndApplyTask.java b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryFetchAndApplyTask.java new file mode 100644 index 00000000000..c5c83c2b4e0 --- /dev/null +++ b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryFetchAndApplyTask.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.alluxio.metric.fetcher; + +import alluxio.master.journal.JournalReader; +import alluxio.master.journal.ufs.AlluxioJournalUtil; +import alluxio.proto.journal.Journal.JournalEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.SmartConstants; +import org.smartdata.conf.SmartConf; +import org.smartdata.metastore.MetaStore; +import org.smartdata.model.SystemInfo; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +public class AlluxioEntryFetchAndApplyTask implements Runnable { + + public static final Logger LOG = LoggerFactory.getLogger(AlluxioEntryFetchAndApplyTask.class); + + private final AtomicLong lastSn; + private final MetaStore metaStore; + private final AlluxioEntryApplier entryApplier; + private JournalReader journalReader; + + public AlluxioEntryFetchAndApplyTask(SmartConf conf, MetaStore metaStore, AlluxioEntryApplier entryApplier, long startSn) { + this.metaStore = metaStore; + this.entryApplier = entryApplier; + this.lastSn = new AtomicLong(startSn); + this.journalReader = AlluxioJournalUtil.getJournalReaderFromSn(conf, startSn + 1); + } + + + @Override + public void run() { + LOG.trace("AlluxioEntryFetchAndApplyTask run at " + new Date()); + try { + JournalEntry journalEntry = journalReader.read(); + while (journalEntry != null) { + entryApplier.apply(journalEntry); + lastSn.getAndSet(journalEntry.getSequenceNumber()); + metaStore.updateAndInsertIfNotExist( + new SystemInfo( + SmartConstants.SMART_ALLUXIO_LAST_ENTRY_SN, String.valueOf(lastSn.get()))); + journalEntry = journalReader.read(); + } + } catch (Throwable t) { + LOG.error("Alluxio Entry Apply Events error", t); + } + } + + public long getLastSn() { + return this.lastSn.get(); + } +} diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryFetcher.java b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryFetcher.java new file mode 100644 index 00000000000..210f8ef4f44 --- /dev/null +++ b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioEntryFetcher.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.alluxio.metric.fetcher; + +import alluxio.client.file.FileSystem; +import alluxio.exception.InvalidJournalEntryException; +import alluxio.master.journal.JournalReader; +import alluxio.master.journal.ufs.AlluxioJournalUtil; +import alluxio.proto.journal.Journal.JournalEntry; +import com.google.common.util.concurrent.*; +import com.squareup.tape.QueueFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.SmartConstants; +import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; +import org.smartdata.metastore.MetaStore; +import org.smartdata.metastore.MetaStoreException; +import org.smartdata.model.SystemInfo; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AlluxioEntryFetcher { + + public static final Logger LOG = LoggerFactory.getLogger(AlluxioEntryFetcher.class); + + private final FileSystem fileSystem; + private final AlluxioNamespaceFetcher alluxioNamespaceFetcher; + private final ScheduledExecutorService scheduledExecutorService; + private final AlluxioEntryApplier alluxioEntryApplier; + private final MetaStore metaStore; + private Callable finishedCallback; + private ScheduledFuture entryFetchFuture; + private ScheduledFuture entryFetchAndApplyFuture; + private AlluxioEntryApplyTask entryApplyTask; + private File entryInotifyFile; + private QueueFile entryQueueFile; + private static SmartConf conf; + + public AlluxioEntryFetcher(FileSystem fileSystem, MetaStore metaStore, ScheduledExecutorService scheduledExecutorService, + Callable finishedCallback) { + this(fileSystem, metaStore, scheduledExecutorService, + new AlluxioEntryApplier(metaStore, fileSystem), finishedCallback, new SmartConf()); + } + + public AlluxioEntryFetcher(FileSystem fileSystem, MetaStore metaStore, ScheduledExecutorService scheduledExecutorService, + Callable finishedCallback, SmartConf conf) { + this(fileSystem, metaStore, scheduledExecutorService, + new AlluxioEntryApplier(metaStore, fileSystem), finishedCallback, conf); + } + + public AlluxioEntryFetcher(FileSystem fileSystem, MetaStore metaStore, ScheduledExecutorService scheduledExecutorService, + AlluxioEntryApplier alluxioEntryApplier, Callable finishedCallback) { + this.fileSystem = fileSystem; + this.scheduledExecutorService = scheduledExecutorService; + this.alluxioEntryApplier = alluxioEntryApplier; + this.metaStore = metaStore; + this.finishedCallback = finishedCallback; + this.conf = new SmartConf(); + this.alluxioNamespaceFetcher = new AlluxioNamespaceFetcher(fileSystem, metaStore, scheduledExecutorService); + } + + public AlluxioEntryFetcher(FileSystem fileSystem, MetaStore metaStore, ScheduledExecutorService scheduledExecutorService, + AlluxioEntryApplier alluxioEntryApplier, Callable finishedCallback, SmartConf conf) { + this.fileSystem = fileSystem; + this.scheduledExecutorService = scheduledExecutorService; + this.alluxioEntryApplier = alluxioEntryApplier; + this.metaStore = metaStore; + this.finishedCallback = finishedCallback; + this.conf = conf; + this.alluxioNamespaceFetcher = new AlluxioNamespaceFetcher(fileSystem, metaStore, scheduledExecutorService); + } + + public void start() throws IOException { + Long lastSn = getLastSeqNum(); + if (lastSn != null && lastSn != -1 && canReadFromLastSeqNum(lastSn)) { + startFromLastSeqNum(lastSn); + } else { + startWithFetchingAlluxioNameSpace(); + } + } + + public static boolean canReadFromLastSeqNum(Long lastSn) { + try { + if (AlluxioJournalUtil.getCurrentSeqNum(conf) == lastSn) { + return true; + } + JournalReader reader = AlluxioJournalUtil.getJournalReaderFromSn(conf, lastSn); + JournalEntry entry = reader.read(); + return entry != null; + } catch (Exception e) { + return false; + } + } + + private Long getLastSeqNum() { + try { + SystemInfo info = + metaStore.getSystemInfoByProperty(SmartConstants.SMART_ALLUXIO_LAST_ENTRY_SN); + return info != null ? Long.parseLong(info.getValue()) : -1L; + } catch (MetaStoreException e) { + return -1L; + } + } + + private void startFromLastSeqNum(long lastSn) throws IOException { + LOG.info("Skipped fetching Alluxio Name Space, start applying alluxio journal entry from " + lastSn); + submitEntryFetchAndApplyTask(lastSn); + try { + finishedCallback.call(); + } catch (Exception e) { + LOG.error("Call back failed", e); + } + } + + private void submitEntryFetchAndApplyTask(long lastSn) throws IOException { + entryFetchAndApplyFuture = + scheduledExecutorService.scheduleAtFixedRate( + new AlluxioEntryFetchAndApplyTask(conf, metaStore, alluxioEntryApplier, lastSn), + 0, + 100, + TimeUnit.MILLISECONDS); + } + + private void startWithFetchingAlluxioNameSpace() throws IOException { + ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(scheduledExecutorService); + entryInotifyFile = new File("/tmp/entry-inotify-" + new Random().nextLong()); + entryQueueFile = new QueueFile(entryInotifyFile); + long startSn = AlluxioJournalUtil.getCurrentSeqNum(conf); + LOG.info("Start fetching alluxio namespace with current journal entry sequence number = " + startSn); + alluxioNamespaceFetcher.startFetch(); + entryFetchFuture = scheduledExecutorService.scheduleAtFixedRate( + new AlluxioEntryFetchTask(entryQueueFile, conf, startSn), 0, 100, TimeUnit.MILLISECONDS); + entryApplyTask = new AlluxioEntryApplyTask(alluxioNamespaceFetcher, alluxioEntryApplier, entryQueueFile, conf, startSn); + ListenableFuture future = listeningExecutorService.submit(entryApplyTask); + Futures.addCallback(future, new AlluxioNameSpaceFetcherCallBack(), scheduledExecutorService); + LOG.info("Start apply alluxio entry."); + } + + private class AlluxioNameSpaceFetcherCallBack implements FutureCallback { + + @Override + public void onSuccess(@Nullable Object o) { + entryFetchFuture.cancel(false); + alluxioNamespaceFetcher.stop(); + try { + entryQueueFile.close(); + submitEntryFetchAndApplyTask(entryApplyTask.getLastSn()); + LOG.info("Alluxio Namespace fetch finished."); + finishedCallback.call(); + } catch (Exception e) { + LOG.error("Call back failed", e); + } + } + + @Override + public void onFailure(Throwable throwable) { + LOG.error("Alluxio NameSpace fetch failed", throwable); + } + } + + public void stop() { + if (entryInotifyFile != null) { + entryInotifyFile.delete(); + } + if (entryFetchFuture != null) { + entryFetchFuture.cancel(false); + } + if (entryFetchAndApplyFuture != null) { + entryFetchAndApplyFuture.cancel(false); + } + } + + private static class AlluxioEntryFetchTask implements Runnable { + private final QueueFile queueFile; + private JournalReader journalReader; + + public AlluxioEntryFetchTask(QueueFile queueFile, SmartConf conf, long startSn) { + this.queueFile = queueFile; + this.journalReader = AlluxioJournalUtil.getJournalReaderFromSn(conf, startSn + 1); + } + + @Override + public void run() { + try { + JournalEntry journalEntry = journalReader.read(); + while (journalEntry != null) { + byte[] seqEntry = journalEntry.toByteArray(); + this.queueFile.add(seqEntry); + journalEntry = journalReader.read(); + } + } catch (IOException | InvalidJournalEntryException e) { + LOG.error("Alluxio entry enqueue error", e); + } + } + } + + private static class AlluxioEntryApplyTask implements Runnable { + private final AlluxioNamespaceFetcher namespaceFetcher; + private final AlluxioEntryApplier entryApplier; + private final QueueFile queueFile; + private long lastSn; + private SmartConf conf; + private List ignoreList; + + public AlluxioEntryApplyTask(AlluxioNamespaceFetcher namespaceFetcher, AlluxioEntryApplier entryApplier, + QueueFile queueFile, SmartConf conf, long lastSn) { + this.namespaceFetcher = namespaceFetcher; + this.entryApplier = entryApplier; + this.queueFile = queueFile; + this.conf = conf; + this.lastSn = lastSn; + this.ignoreList = getIgnoreDirFromConfig(); + } + + public List getIgnoreDirFromConfig() { + String ignoreDirs = this.conf.get(SmartConfKeys.SMART_IGNORE_DIRS_KEY); + List ignoreList; + if (ignoreDirs == null || ignoreDirs.equals("")) { + ignoreList = new ArrayList<>(); + } else { + ignoreList = Arrays.asList(ignoreDirs.split(",")); + } + for (int i = 0; i < ignoreList.size(); i++) { + if (!ignoreList.get(i).endsWith("/")) { + ignoreList.set(i, ignoreList.get(i).concat("/")); + } + } + return ignoreList; + } + + public boolean fetchPathInIgnoreList(String path) { + if (!path.endsWith("/")) { + path = path.concat("/"); + } + for (int i = 0; i < ignoreList.size(); i++) { + if (path.equals(ignoreList.get(i))) { + return true; + } + } + return false; + } + + public boolean ignoreEntry(JournalEntry entry) throws MetaStoreException { + String inodePath; + if (entry.hasInodeDirectory()) { + inodePath = entryApplier.getPathFromInodeDir(entry.getInodeDirectory()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasInodeFile()) { + inodePath = entryApplier.getPathFromInodeFile(entry.getInodeFile()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasInodeLastModificationTime()) { + inodePath = entryApplier.getPathByFileId(entry.getInodeLastModificationTime().getId()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasPersistDirectory()) { + inodePath = entryApplier.getPathByFileId(entry.getPersistDirectory().getId()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasSetAttribute()) { + inodePath = entryApplier.getPathByFileId(entry.getSetAttribute().getId()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasRename()) { + inodePath = entryApplier.getPathByFileId(entry.getRename().getId()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasDeleteFile()) { + inodePath = entryApplier.getPathByFileId(entry.getDeleteFile().getId()); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasAddMountPoint()) { + inodePath = entry.getAddMountPoint().getAlluxioPath(); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasDeleteMountPoint()) { + inodePath = entry.getDeleteMountPoint().getAlluxioPath(); + return fetchPathInIgnoreList(inodePath); + } else if (entry.hasAsyncPersistRequest() + || entry.hasCompleteFile() + || entry.hasInodeDirectoryIdGenerator() + || entry.hasReinitializeFile()) { + return false; + } + return true; + } + + @Override + public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + if (!namespaceFetcher.fetchFinished()) { + Thread.sleep(100); + } else { + while (!queueFile.isEmpty()) { + JournalEntry entry = JournalEntry.parseFrom(queueFile.peek()); + queueFile.remove(); + if (!ignoreEntry(entry)) { + this.entryApplier.apply(entry); + this.lastSn = entry.getSequenceNumber(); + } + } + break; + } + } + } catch (InterruptedException | IOException | MetaStoreException e) { + LOG.error("Alluxio entry dequeue error", e); + } + } + + public long getLastSn() { + return this.lastSn; + } + } +} diff --git a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioNamespaceFetcher.java b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioNamespaceFetcher.java index 090899d304b..c43bbcfc745 100644 --- a/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioNamespaceFetcher.java +++ b/smart-alluxio-support/smart-alluxio/src/main/java/org/smartdata/alluxio/metric/fetcher/AlluxioNamespaceFetcher.java @@ -49,6 +49,10 @@ public class AlluxioNamespaceFetcher { public static final Logger LOG = LoggerFactory.getLogger(AlluxioNamespaceFetcher.class); + public AlluxioNamespaceFetcher(FileSystem fs, MetaStore metaStore, ScheduledExecutorService service) { + this(fs, metaStore, DEFAULT_INTERVAL, service); + } + public AlluxioNamespaceFetcher(FileSystem fs, MetaStore metaStore, long fetchInterval, ScheduledExecutorService service) { this.fetchTask = new AlluxioFetchTask(fs); diff --git a/smart-alluxio-support/smart-alluxio/src/test/java/org/smartdata/alluxio/action/metric/fetcher/TestAlluxioEntryApplier.java b/smart-alluxio-support/smart-alluxio/src/test/java/org/smartdata/alluxio/action/metric/fetcher/TestAlluxioEntryApplier.java new file mode 100644 index 00000000000..939826a1182 --- /dev/null +++ b/smart-alluxio-support/smart-alluxio/src/test/java/org/smartdata/alluxio/action/metric/fetcher/TestAlluxioEntryApplier.java @@ -0,0 +1,366 @@ +package org.smartdata.alluxio.action.metric.fetcher; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.proto.journal.File.*; +import alluxio.proto.journal.Journal.JournalEntry; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.smartdata.alluxio.metric.fetcher.AlluxioEntryApplier; +import org.smartdata.metastore.MetaStore; +import org.smartdata.metastore.TestDaoUtil; +import org.smartdata.model.BackUpInfo; +import org.smartdata.model.FileDiff; +import org.smartdata.model.FileDiffType; +import org.smartdata.model.FileInfo; + +import java.util.List; + +public class TestAlluxioEntryApplier extends TestDaoUtil { + + private MetaStore metaStore; + + @Before + public void setUp() throws Exception { + initDao(); + metaStore = new MetaStore(druidPool); + } + + @After + public void tearDown() throws Exception { + closeDao(); + } + + @Test + public void testInodeDirectoryApplier() throws Exception { + FileSystem fs = Mockito.mock(FileSystem.class); + AlluxioEntryApplier entryApplier = new AlluxioEntryApplier(metaStore, fs); + + FileInfo rootDir = FileInfo.newBuilder() + .setFileId(0) + .setIsdir(true) + .setPath("/") + .build(); + metaStore.insertFile(rootDir); + + alluxio.wire.FileInfo info1 = new alluxio.wire.FileInfo() + .setFileId(1) + .setPath("/dir1") + .setLength(0L) + .setFolder(true) + .setBlockSizeBytes(1000000) + .setLastModificationTimeMs(1528876616216L) + .setCreationTimeMs(1528876616216L) + .setMode(493) + .setOwner("user1") + .setGroup("group1"); + URIStatus status1 = new URIStatus(info1); + Mockito.when(fs.getStatus(new AlluxioURI("/dir1"))).thenReturn(status1); + + InodeDirectoryEntry inodeDirectoryEntry = InodeDirectoryEntry.newBuilder() + .setId(1) + .setParentId(0) + .setName("dir1") + .setPersistenceState("NOT_PERSISTED") + .setPinned(false) + .setCreationTimeMs(1528876616216L) + .setLastModificationTimeMs(1528876616216L) + .setOwner("user1") + .setGroup("group1") + .setMode(493) + .setMountPoint(false) + .setDirectChildrenLoaded(false) + .setTtl(-1L) + .setTtlAction(PTtlAction.DELETE) + .build(); + JournalEntry inodeDirectoryJEntry = JournalEntry.newBuilder() + .setInodeDirectory(inodeDirectoryEntry) + .build(); + entryApplier.apply(inodeDirectoryJEntry); + + Assert.assertTrue(metaStore.getFile().get(0).getPath().equals("/")); + Assert.assertTrue(metaStore.getFile().get(1).getPath().equals("/dir1")); + + Assert.assertEquals("user1", metaStore.getFile("/dir1").getOwner()); + Assert.assertEquals(1528876616216L, metaStore.getFile("/dir1").getModificationTime()); + } + + @Test + public void testInodeFileApplier() throws Exception { + FileSystem fs = Mockito.mock(FileSystem.class); + AlluxioEntryApplier entryApplier = new AlluxioEntryApplier(metaStore, fs); + + FileInfo fooDir = FileInfo.newBuilder() + .setFileId(6) + .setIsdir(true) + .setPath("/foo") + .build(); + metaStore.insertFile(fooDir); + + BackUpInfo backUpInfo = new BackUpInfo(1L, "/foo/foobar1", "remote/dest/", 10); + metaStore.insertBackUpInfo(backUpInfo); + + alluxio.wire.FileInfo info1 = new alluxio.wire.FileInfo() + .setFileId(33554431) + .setPath("/foo/foobar1") + .setLength(10L) + .setFolder(false) + .setBlockSizeBytes(536870912) + .setLastModificationTimeMs(1515665470681L) + .setCreationTimeMs(1515665470681L) + .setMode(420) + .setOwner("user1") + .setGroup("group1"); + URIStatus status1 = new URIStatus(info1); + Mockito.when(fs.getStatus(new AlluxioURI("/foo/foobar1"))).thenReturn(status1); + + InodeFileEntry inodeFileEntry = InodeFileEntry.newBuilder() + .setId(33554431) + .setParentId(6) + .setName("foobar1") + .setPersistenceState("NOT_PERSISTED") + .setPinned(false) + .setCreationTimeMs(1515665470681L) + .setBlockSizeBytes(536870912) + .setLength(10L) + .setCompleted(false) + .setCacheable(true) + .setTtl(-1L) + .setOwner("user1") + .setGroup("group1") + .setMode(420) + .setTtlAction(PTtlAction.DELETE) + .build(); + JournalEntry inodeFileJEntry = JournalEntry.newBuilder() + .setInodeFile(inodeFileEntry) + .build(); + entryApplier.apply(inodeFileJEntry); + + Assert.assertEquals(33554431, metaStore.getFile("/foo/foobar1").getFileId()); + Assert.assertEquals("user1", metaStore.getFile("/foo/foobar1").getOwner()); + Assert.assertEquals(536870912, metaStore.getFile("/foo/foobar1").getBlocksize()); + + List fileDiffs = metaStore.getFileDiffsByFileName("/foo/foobar1"); + + Assert.assertTrue(fileDiffs.size() > 0); + + for (FileDiff fileDiff : fileDiffs) { + if (fileDiff.getDiffType().equals(FileDiffType.APPEND)) { + Assert.assertTrue(fileDiff.getParameters().get("-owner").equals("user1")); + Assert.assertTrue(fileDiff.getParameters().get("-mtime").equals("1515665470681")); + Assert.assertTrue(fileDiff.getParameters().get("-length").equals("10")); + } + } + } + + @Test + public void testInodeLastMTimeApplier() throws Exception { + FileSystem fs = Mockito.mock(FileSystem.class); + AlluxioEntryApplier entryApplier = new AlluxioEntryApplier(metaStore, fs); + + FileInfo fooFile = FileInfo.newBuilder() + .setFileId(5) + .setIsdir(true) + .setPath("/baz") + .build(); + metaStore.insertFile(fooFile); + + BackUpInfo backUpInfo = new BackUpInfo(1L, "/baz", "remote/dest/", 10); + metaStore.insertBackUpInfo(backUpInfo); + + alluxio.wire.FileInfo info1 = new alluxio.wire.FileInfo() + .setFileId(5) + .setPath("/baz") + .setLength(0L) + .setFolder(true) + .setBlockSizeBytes(1000000) + .setLastModificationTimeMs(1515665470681L) + .setCreationTimeMs(1515665470681L) + .setMode(493) + .setOwner("user1") + .setGroup("group1"); + URIStatus status1 = new URIStatus(info1); + Mockito.when(fs.getStatus(new AlluxioURI("/baz"))).thenReturn(status1); + + InodeLastModificationTimeEntry inodeLastMTimeEntry = InodeLastModificationTimeEntry.newBuilder() + .setId(5) + .setLastModificationTimeMs(1515667810911L) + .build(); + + JournalEntry inodeLastMTimeJEntry = JournalEntry.newBuilder() + .setInodeLastModificationTime(inodeLastMTimeEntry) + .build(); + entryApplier.apply(inodeLastMTimeJEntry); + + List fileDiffs = metaStore.getFileDiffsByFileName("/baz"); + + Assert.assertTrue(fileDiffs.size() > 0); + + for (FileDiff fileDiff : fileDiffs) { + if (fileDiff.getDiffType().equals(FileDiffType.METADATA)) { + Assert.assertEquals("/baz", fileDiff.getSrc()); + Assert.assertEquals("1515667810911", fileDiff.getParameters().get("-mtime")); + } + } + } + + @Test + public void testSetAttributeApplier() throws Exception { + FileSystem fs = Mockito.mock(FileSystem.class); + AlluxioEntryApplier entryApplier = new AlluxioEntryApplier(metaStore, fs); + + FileInfo fooFile = FileInfo.newBuilder() + .setFileId(33554431) + .setIsdir(false) + .setPath("/foo/foobar") + .build(); + metaStore.insertFile(fooFile); + + BackUpInfo backUpInfo = new BackUpInfo(1L, "/foo/foobar", "remote/dest/", 10); + metaStore.insertBackUpInfo(backUpInfo); + + alluxio.wire.FileInfo info1 = new alluxio.wire.FileInfo() + .setFileId(33554431) + .setPath("/foo/foobar") + .setLength(100L) + .setFolder(false) + .setBlockSizeBytes(210000) + .setLastModificationTimeMs(1515665470681L) + .setCreationTimeMs(1515665470681L) + .setMode(493) + .setOwner("user1") + .setGroup("group1"); + URIStatus status1 = new URIStatus(info1); + Mockito.when(fs.getStatus(new AlluxioURI("/foo/foobar"))).thenReturn(status1); + + SetAttributeEntry setAttributeEntry = SetAttributeEntry.newBuilder() + .setId(33554431) + .setOpTimeMs(1515667208590658L) + .setPermission(511) + .build(); + + JournalEntry setAttributeJEntry = JournalEntry.newBuilder() + .setSetAttribute(setAttributeEntry) + .build(); + entryApplier.apply(setAttributeJEntry); + + List fileDiffs = metaStore.getFileDiffsByFileName("/foo/foobar"); + + Assert.assertTrue(fileDiffs.size() > 0); + + for (FileDiff fileDiff : fileDiffs) { + if (fileDiff.getDiffType().equals(FileDiffType.METADATA)) { + Assert.assertEquals("511", fileDiff.getParameters().get("-permission")); + } + } + } + + @Test + public void testRenameApplier() throws Exception { + FileSystem fs = Mockito.mock(FileSystem.class); + AlluxioEntryApplier entryApplier = new AlluxioEntryApplier(metaStore, fs); + + FileInfo fooFile = FileInfo.newBuilder() + .setFileId(50331647) + .setIsdir(false) + .setPath("/bar/foobar1") + .build(); + metaStore.insertFile(fooFile); + + BackUpInfo backUpInfo = new BackUpInfo(1L, "/bar/foobar1", "remote/dest/", 10); + metaStore.insertBackUpInfo(backUpInfo); + + alluxio.wire.FileInfo info1 = new alluxio.wire.FileInfo() + .setFileId(50331647) + .setPath("/bar/foobar1") + .setLength(300L) + .setFolder(false) + .setBlockSizeBytes(310000) + .setLastModificationTimeMs(1515665270681L) + .setCreationTimeMs(1515665270681L) + .setMode(493) + .setOwner("user1") + .setGroup("group1"); + URIStatus status1 = new URIStatus(info1); + Mockito.when(fs.getStatus(new AlluxioURI("/bar/foobar1"))).thenReturn(status1); + + RenameEntry renameEntry = RenameEntry.newBuilder() + .setId(50331647) + .setOpTimeMs(1515666148444L) + .setDstPath("/bar/foobar1_new") + .build(); + + JournalEntry renameJEntry = JournalEntry.newBuilder() + .setRename(renameEntry) + .build(); + entryApplier.apply(renameJEntry); + + List fileDiffs = metaStore.getFileDiffsByFileName("/bar/foobar1"); + + Assert.assertTrue(fileDiffs.size() > 0); + + for (FileDiff fileDiff : fileDiffs) { + if (fileDiff.getDiffType().equals(FileDiffType.RENAME)) { + Assert.assertEquals("/bar/foobar1", fileDiff.getSrc()); + Assert.assertEquals("/bar/foobar1_new", fileDiff.getParameters().get("-dest")); + } + } + } + + @Test + public void testDeleteFileApplier() throws Exception { + FileSystem fs = Mockito.mock(FileSystem.class); + AlluxioEntryApplier entryApplier = new AlluxioEntryApplier(metaStore, fs); + + FileInfo fooFile = FileInfo.newBuilder() + .setFileId(100663295) + .setIsdir(false) + .setPath("/foo/foobar_del") + .build(); + metaStore.insertFile(fooFile); + + BackUpInfo backUpInfo = new BackUpInfo(1L, "/foo/foobar_del", "remote/dest/", 10); + metaStore.insertBackUpInfo(backUpInfo); + + alluxio.wire.FileInfo info1 = new alluxio.wire.FileInfo() + .setFileId(100663295) + .setPath("/foo/foobar_del") + .setLength(500L) + .setFolder(false) + .setBlockSizeBytes(510000) + .setLastModificationTimeMs(1515665270681L) + .setCreationTimeMs(1515665270681L) + .setMode(493) + .setOwner("user1") + .setGroup("group1"); + URIStatus status1 = new URIStatus(info1); + Mockito.when(fs.getStatus(new AlluxioURI("/foo/foobar_del"))).thenReturn(status1); + + DeleteFileEntry deleteFileEntry = DeleteFileEntry.newBuilder() + .setId(100663295) + .setOpTimeMs(1515737580798L) + .setAlluxioOnly(true) + .setRecursive(false) + .build(); + + JournalEntry deleteFileJEntry = JournalEntry.newBuilder() + .setDeleteFile(deleteFileEntry) + .build(); + entryApplier.apply(deleteFileJEntry); + + List fileDiffs = metaStore.getFileDiffsByFileName("/foo/foobar_del"); + + Assert.assertTrue(fileDiffs.size() > 0); + + for (FileDiff fileDiff : fileDiffs) { + if (fileDiff.getDiffType().equals(FileDiffType.DELETE)) { + Assert.assertEquals("/foo/foobar_del", fileDiff.getSrc()); + } + } + } + +} diff --git a/smart-alluxio-support/smart-alluxio/src/test/java/org/smartdata/alluxio/action/metric/fetcher/TestAlluxioEntryFetcher.java b/smart-alluxio-support/smart-alluxio/src/test/java/org/smartdata/alluxio/action/metric/fetcher/TestAlluxioEntryFetcher.java new file mode 100644 index 00000000000..a5e4b73db88 --- /dev/null +++ b/smart-alluxio-support/smart-alluxio/src/test/java/org/smartdata/alluxio/action/metric/fetcher/TestAlluxioEntryFetcher.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.smartdata.alluxio.action.metric.fetcher; + +import alluxio.AlluxioURI; +import alluxio.Configuration; +import alluxio.PropertyKey; +import alluxio.client.WriteType; +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemTestUtils; +import alluxio.client.file.options.*; +import alluxio.master.LocalAlluxioCluster; +import alluxio.master.journal.JournalUtils; +import alluxio.proto.journal.Journal.JournalEntry; +import alluxio.security.authorization.Mode; +import alluxio.underfs.UnderFileSystem; +import alluxio.util.io.PathUtils; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartdata.SmartConstants; +import org.smartdata.alluxio.metric.fetcher.AlluxioEntryApplier; +import org.smartdata.alluxio.metric.fetcher.AlluxioEntryFetcher; +import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; +import org.smartdata.metastore.MetaStore; +import org.smartdata.metastore.TestDaoUtil; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + +public class TestAlluxioEntryFetcher extends TestDaoUtil { + public static final Logger LOG = LoggerFactory.getLogger(TestAlluxioEntryFetcher.class); + + LocalAlluxioCluster localAlluxioCluster; + FileSystem fs; + MetaStore metaStore; + + @Rule + public TemporaryFolder mTemporaryFolder = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + localAlluxioCluster = new LocalAlluxioCluster(2); + localAlluxioCluster.initConfiguration(); + Configuration.set(PropertyKey.WEB_RESOURCES, + PathUtils.concatPath(System.getProperty("user.dir"), "src/test/webapp")); + localAlluxioCluster.start(); + fs = localAlluxioCluster.getClient(); + initDao(); + metaStore = new MetaStore(druidPool); + } + + @After + public void tearDown() throws Exception { + if (localAlluxioCluster != null) { + localAlluxioCluster.stop(); + } + closeDao(); + } + + @Test + public void testEntryFetcher() throws Exception { + URI journalLocation = JournalUtils.getJournalLocation(); + SmartConf conf = new SmartConf(); + conf.set(SmartConfKeys.SMART_ALLUXIO_MASTER_JOURNAL_DIR_KEY, journalLocation.getPath()); + + EntryApplierForTest entryApplierForTest = new EntryApplierForTest(metaStore, fs); + final AlluxioEntryFetcher entryFetcher = new AlluxioEntryFetcher(fs, metaStore, + Executors.newScheduledThreadPool(2), entryApplierForTest, new Callable() { + @Override + public Object call() throws Exception { + return null; // Do nothing + } + }, conf); + + Assert.assertFalse(AlluxioEntryFetcher.canReadFromLastSeqNum(100L)); + + /** + * Generate such local structure + * ├── foo | + * ├── foobar1 + * └── foobar2 + * ├── bar | + * └── foobar3 + * └── foobar4 + */ + fs.createDirectory(new AlluxioURI("/foo")); + fs.createDirectory(new AlluxioURI("/bar")); + + FileSystemTestUtils.createByteFile(fs, "/foo/foobar1", WriteType.CACHE_THROUGH, 10); + FileSystemTestUtils.createByteFile(fs, "/foo/foobar2", WriteType.CACHE_THROUGH, 20); + FileSystemTestUtils.createByteFile(fs, "/bar/foobar3", WriteType.CACHE_THROUGH, 30); + FileSystemTestUtils.createByteFile(fs, "/foobar4", WriteType.CACHE_THROUGH, 40); + + Thread thread = new Thread() { + public void run() { + try { + entryFetcher.start(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + thread.start(); + + // need wait long enough to finish namespace fetcher + Thread.sleep(10*1000); + + fs.setAttribute(new AlluxioURI("/foo/foobar1"), SetAttributeOptions.defaults().setPersisted(true)); + fs.setAttribute(new AlluxioURI("/foo/foobar2"), SetAttributeOptions.defaults().setPinned(true)); + fs.setAttribute(new AlluxioURI("/bar/foobar3"), SetAttributeOptions.defaults().setTtl(1000000L)); + + String mLocalUfsRoot = mTemporaryFolder.getRoot().getAbsolutePath(); + UnderFileSystem mLocalUfs = UnderFileSystem.Factory.create(mLocalUfsRoot); + String mountpath = PathUtils.concatPath(mLocalUfsRoot, "mtd_ufs"); + mLocalUfs.mkdirs(mountpath); + + Assert.assertTrue(new File(mountpath).exists()); + + fs.mount(new AlluxioURI("/mtd_t"), new AlluxioURI(mountpath), MountOptions.defaults()); + + fs.rename(new AlluxioURI("/foo/foobar1"), new AlluxioURI("/foo/foo1"), RenameOptions.defaults()); + fs.delete(new AlluxioURI("/bar/foobar3"), DeleteOptions.defaults().setRecursive(true)); + + fs.createDirectory(new AlluxioURI("/baz")); + FileSystemTestUtils.createByteFile(fs, "/baz/foobar5", WriteType.CACHE_THROUGH, 50); + + Mode mode = new Mode((short)0755); + fs.setAttribute(new AlluxioURI("/baz/foobar5"), SetAttributeOptions.defaults().setMode(mode)); + + // free action does not generate journal entry + fs.free(new AlluxioURI("/baz"), FreeOptions.defaults().setRecursive(true)); + + while (entryApplierForTest.getEntries().size() != 16) { + Thread.sleep(100); + } + + List entries = entryApplierForTest.getEntries(); + Assert.assertTrue(entries.get(0).hasSetAttribute() && entries.get(0).getSetAttribute().hasPersisted()); + Assert.assertTrue(entries.get(1).hasSetAttribute() && entries.get(1).getSetAttribute().hasPinned()); + Assert.assertTrue(entries.get(2).hasSetAttribute() && entries.get(2).getSetAttribute().hasTtl()); + Assert.assertTrue(entries.get(3).hasInodeLastModificationTime()); + Assert.assertTrue(entries.get(4).hasInodeDirectoryIdGenerator()); + Assert.assertTrue(entries.get(5).hasInodeDirectory()); + Assert.assertTrue(entries.get(6).hasAddMountPoint()); + Assert.assertTrue(entries.get(7).hasRename()); + Assert.assertTrue(entries.get(8).hasDeleteFile()); + Assert.assertTrue(entries.get(9).hasInodeLastModificationTime()); + Assert.assertTrue(entries.get(10).hasInodeDirectoryIdGenerator()); + Assert.assertTrue(entries.get(11).hasInodeDirectory()); + Assert.assertTrue(entries.get(12).hasInodeLastModificationTime()); + Assert.assertTrue(entries.get(13).hasInodeFile()); + Assert.assertTrue(entries.get(14).hasCompleteFile()); + Assert.assertTrue(entries.get(15).hasSetAttribute() && entries.get(15).getSetAttribute().hasPermission()); + + entryFetcher.stop(); + + Assert.assertTrue(metaStore.containSystemInfo(SmartConstants.SMART_ALLUXIO_LAST_ENTRY_SN)); + Assert.assertTrue(AlluxioEntryFetcher.canReadFromLastSeqNum( + Long.parseLong(metaStore.getSystemInfoByProperty(SmartConstants.SMART_ALLUXIO_LAST_ENTRY_SN).getValue()))); + } + + private static class EntryApplierForTest extends AlluxioEntryApplier { + private List entries = new ArrayList<>(); + + public EntryApplierForTest(MetaStore metaStore, FileSystem fs) { + super(metaStore, fs); + } + + @Override + public void apply(JournalEntry entry) { + entries.add(entry); + } + + public List getEntries() { + return entries; + } + } + +} diff --git a/smart-common/src/main/java/org/smartdata/SmartConstants.java b/smart-common/src/main/java/org/smartdata/SmartConstants.java index 14e3e6a0fcd..9206d43c1f8 100644 --- a/smart-common/src/main/java/org/smartdata/SmartConstants.java +++ b/smart-common/src/main/java/org/smartdata/SmartConstants.java @@ -33,6 +33,9 @@ public class SmartConstants { public static final String SMART_HADOOP_LAST_INOTIFY_TXID = "smart_hadoop_last_inotify_txid"; + public static final String SMART_ALLUXIO_LAST_ENTRY_SN = + "smart_alluxio_last_entry_sn"; + public static final String SMART_CLIENT_PROTOCOL_NAME = "org.smartdata.protocol.SmartClientProtocol"; diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java index 03acd91d6cb..95c3209a52a 100644 --- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java +++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java @@ -52,6 +52,8 @@ public class SmartConfKeys { // Configure keys for Alluxio public static final String SMART_ALLUXIO_MASTER_HOSTNAME_KEY = "smart.alluxio.master.hostname"; public static final String SMART_ALLUXIO_CONF_DIR_KEY = "smart.alluxio.conf.dir"; + public static final String SMART_ALLUXIO_MASTER_JOURNAL_DIR_KEY = + "smart.alluxio.master.journal.dir"; // SSM public static final String SMART_SERVER_RPC_ADDRESS_KEY = "smart.server.rpc.address";