Skip to content

Commit

Permalink
support alluxio1.6.1 to get metadata changes inspired by hdfs inotify…
Browse files Browse the repository at this point in the history
…… (#1830)
  • Loading branch information
murong00 authored and taojieterry committed Jun 19, 2018
1 parent 1a5711c commit 2b4aef6
Show file tree
Hide file tree
Showing 13 changed files with 1,502 additions and 13 deletions.
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hadoop.version>2.7.3</hadoop.version>
<alluxio.version>1.5.0</alluxio.version>
<alluxio.version>1.6.1</alluxio.version>
<junit.version>4.11</junit.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
Expand Down Expand Up @@ -98,6 +98,9 @@
<!-- TODO remove in future -->
<module>smart-hadoop-support</module>
</modules>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
</profiles>

Expand Down
18 changes: 17 additions & 1 deletion smart-alluxio-support/smart-alluxio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@
<artifactId>alluxio-core-client-fs</artifactId>
<version>${alluxio.version}</version>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-server-common</artifactId>
<version>${alluxio.version}</version>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-minicluster</artifactId>
Expand Down Expand Up @@ -157,6 +162,17 @@
<artifactId>smart-metastore</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.squareup</groupId>
<artifactId>tape</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.8</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -179,4 +195,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package alluxio.master.journal.ufs;

import alluxio.AlluxioURI;
import alluxio.Configuration;
import alluxio.Constants;
import alluxio.PropertyKey;
import alluxio.master.NoopMaster;
import alluxio.master.journal.JournalReader;
import alluxio.proto.journal.Journal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;


/**
* Util for reading the journal entries given a range of sequence numbers.
*/
public class AlluxioJournalUtil {

public static final Logger LOG = LoggerFactory.getLogger(AlluxioJournalUtil.class);

private static String sMaster = Constants.FILE_SYSTEM_MASTER_NAME;

/**
* @param conf smart configuration
* @return the current entry sequence number
*/
public static Long getCurrentSeqNum(SmartConf conf) {
UfsJournal journal =
new UfsJournalSystem(getJournalLocation(conf), 0).createJournal(new NoopMaster(sMaster));
UfsJournalFile currentLog;
try {
currentLog = UfsJournalSnapshot.getCurrentLog(journal);
} catch (IOException e) {
throw new RuntimeException(e);
}
long sn = -1L;
if (currentLog != null) {
try (JournalReader reader = new UfsJournalReader(journal, currentLog.getStart(), true)) {
Journal.JournalEntry entry;
while ((entry = reader.read()) != null) {
sn = entry.getSequenceNumber();
if (sn >= Long.MAX_VALUE) {
break;
}
}
} catch (Exception e) {
LOG.error("Failed to read next journal entry.", e);
}
}
return sn;
}

/**
* @param conf smart configuration
* @param startSn journal entry sequence number
* @return journal reader
*/
public static JournalReader getJournalReaderFromSn(SmartConf conf, Long startSn) {
UfsJournal journal =
new UfsJournalSystem(getJournalLocation(conf), 0).createJournal(new NoopMaster(sMaster));
JournalReader reader = new UfsJournalReader(journal, startSn, true);
return reader;
}

/**
* @param conf smart configuration
* @return the journal location
*/
private static URI getJournalLocation(SmartConf conf) {
String alluxioMasterJournalDir = conf.get(
SmartConfKeys.SMART_ALLUXIO_MASTER_JOURNAL_DIR_KEY, "/opt/alluxio/journal");
Configuration.set(PropertyKey.MASTER_JOURNAL_FOLDER, alluxioMasterJournalDir);
String journalDirectory = Configuration.get(PropertyKey.MASTER_JOURNAL_FOLDER);
if (!journalDirectory.endsWith(AlluxioURI.SEPARATOR)) {
journalDirectory += AlluxioURI.SEPARATOR;
}
try {
return new URI(journalDirectory);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,42 @@
*/
package org.smartdata.alluxio;

import alluxio.AlluxioURI;
import alluxio.client.WriteType;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.options.CreateFileOptions;
import alluxio.client.file.options.DeleteOptions;
import alluxio.exception.AlluxioException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.alluxio.metric.fetcher.AlluxioNamespaceFetcher;
import org.smartdata.alluxio.metric.fetcher.AlluxioEntryFetcher;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.StatesUpdateService;

import alluxio.client.file.FileSystem;

import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* Polls metrics and events from Alluxio Server
*/
public class AlluxioStatesUpdateService extends StatesUpdateService {

private static final String ALLUXIO_MOVER_ID_PATH = "/system/alluxio-mover.id";
private volatile boolean inSafeMode;
private FileSystem alluxioFs;
private ScheduledExecutorService executorService;
private AlluxioNamespaceFetcher namespaceFetcher;
private AlluxioEntryFetcher alluxioEntryFetcher;
private FileOutStream moverIdOutStream;

public static final Logger LOG =
LoggerFactory.getLogger(AlluxioStatesUpdateService.class);

public AlluxioStatesUpdateService(SmartContext context, MetaStore metaStore) {
super(context, metaStore);
this.inSafeMode = true;
}

/**
Expand All @@ -55,29 +65,69 @@ public void init() throws IOException {
LOG.info("Initializing ...");
SmartContext context = getContext();
this.alluxioFs = AlluxioUtil.getAlluxioFs(context);
this.moverIdOutStream = checkAndMarkRunning(alluxioFs);
this.executorService = Executors.newScheduledThreadPool(4);
this.namespaceFetcher = new AlluxioNamespaceFetcher(alluxioFs, metaStore,
AlluxioNamespaceFetcher.DEFAULT_INTERVAL, executorService);
this.alluxioEntryFetcher = new AlluxioEntryFetcher(alluxioFs, metaStore,
executorService, new EntryFetchFinishedCallBack(), context.getConf());
LOG.info("Initialized.");
}

private class EntryFetchFinishedCallBack implements Callable<Object> {
@Override
public Object call() throws Exception {
inSafeMode = false;
return null;
}
}

@Override
public boolean inSafeMode() {
return inSafeMode;
}

/**
* Start daemon threads in StatesManager for function.
*/
@Override
public void start() throws IOException {
LOG.info("Starting ...");
this.namespaceFetcher.startFetch();
this.alluxioEntryFetcher.start();
LOG.info("Started. ");
}

@Override
public void stop() throws IOException {
LOG.info("Stopping ...");
if (this.namespaceFetcher != null) {
this.namespaceFetcher.stop();
if (moverIdOutStream != null) {
try {
moverIdOutStream.close();
} catch (IOException e) {
LOG.debug("Close alluxio 'mover' ID output stream error", e);
}
}
if (alluxioEntryFetcher != null) {
alluxioEntryFetcher.stop();
}
LOG.info("Stopped.");
}

private FileOutStream checkAndMarkRunning(FileSystem fs) throws IOException {
AlluxioURI moverIdPath = new AlluxioURI(ALLUXIO_MOVER_ID_PATH);
try {
if (fs.exists(moverIdPath)) {
// Alluxio does not support append operation (ALLUXIO-25), here just delete it
fs.delete(moverIdPath, DeleteOptions.defaults().setRecursive(true));
}
CreateFileOptions options = CreateFileOptions.defaults().setWriteType(
WriteType.MUST_CACHE);
FileOutStream fos = fs.createFile(moverIdPath, options);
fos.write(InetAddress.getLocalHost().getHostName().getBytes());
fos.flush();
return fos;
} catch (IOException | AlluxioException e) {
LOG.error("Unable to lock alluxio 'mover', please stop alluxio 'mover' first.");
throw new IOException(e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import alluxio.client.file.URIStatus;
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConfKeys;
import org.slf4j.Logger;
Expand All @@ -28,6 +29,7 @@
import alluxio.PropertyKey;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import org.smartdata.model.FileInfo;

/**
* Contain utils related to alluxio cluster.
Expand All @@ -44,6 +46,22 @@ public static FileSystem getAlluxioFs(SmartContext context) throws IOException {
FileSystemContext fsContext = FileSystemContext.create();
return FileSystem.Factory.get(fsContext);
}


public static FileInfo convertFileStatus(URIStatus status) {
FileInfo fileInfo = new FileInfo(
status.getPath(),
status.getFileId(),
status.getLength(),
status.isFolder(),
(short)1,
status.getBlockSizeBytes(),
status.getLastModificationTimeMs(),
status.getCreationTimeMs(),
(short) status.getMode(),
status.getOwner(),
status.getGroup(),
(byte) 0);
return fileInfo;
}

}
Loading

0 comments on commit 2b4aef6

Please sign in to comment.