Skip to content

Commit

Permalink
Merge pull request vespa-engine#9886 from vespa-engine/balder/contino…
Browse files Browse the repository at this point in the history
…usly-drop-logfile-from-memory-as-you-compress-it

Drop file continously from cache as we read it.
  • Loading branch information
baldersheim authored Jun 25, 2019
2 parents c3223ae + b50300f commit f36d37d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
package com.yahoo.container.logging;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.core.AccessLogConfig;
import com.yahoo.io.NativeIO;
import com.yahoo.log.LogFileDb;
import com.yahoo.system.ProcessExecuter;
import com.yahoo.yolean.Exceptions;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -22,8 +22,6 @@
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.StreamHandler;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;

/**
Expand All @@ -41,15 +39,15 @@ public class LogFileHandler extends StreamHandler {
private final boolean compressOnRotation;
private long[] rotationTimes = {0}; //default to one log per day, at midnight
private String filePattern = "./log.%T"; // default to current directory, ms time stamp
private long lastRotationTime = -1; // absolute time (millis since epoch) of current file start
private int numberOfRecords = -1;
private long nextRotationTime = 0;
private OutputStream currentOutputStream = null;
private FileOutputStream currentOutputStream = null;
private String fileName;
private String symlinkName = null;
private ArrayBlockingQueue<LogRecord> logQueue = new ArrayBlockingQueue<>(100000);
private LogRecord rotateCmd = new LogRecord(Level.SEVERE, "rotateNow");
private ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression"));
private final NativeIO nativeIO = new NativeIO();
private long lastDropPosition = 0;

static private class LogThread extends Thread {
LogFileHandler logFileHandler;
Expand Down Expand Up @@ -122,7 +120,21 @@ public void publish(LogRecord r) {
}
}

private void internalPublish(LogRecord r) throws InterruptedException {
@Override
public synchronized void flush() {
super.flush();
try {
if (currentOutputStream != null) {
long newPos = currentOutputStream.getChannel().position();
nativeIO.dropPartialFileFromCache(currentOutputStream.getFD(), lastDropPosition, newPos, true);
lastDropPosition = newPos;
}
} catch (IOException e) {
logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e));
}
}

private void internalPublish(LogRecord r) {
// first check to see if new file needed.
// if so, use this.internalRotateNow() to do it

Expand All @@ -133,8 +145,6 @@ private void internalPublish(LogRecord r) throws InterruptedException {
if (now > nextRotationTime || currentOutputStream == null) {
internalRotateNow();
}
// count records, and publish
numberOfRecords++;
super.publish(r);
}

Expand Down Expand Up @@ -177,9 +187,9 @@ long getNextRotationTime (long now) {
}
long nowTod = timeOfDayMillis(now);
long next = 0;
for (int i = 0; i<rotationTimes.length; i++) {
if (nowTod < rotationTimes[i]) {
next = rotationTimes[i]-nowTod + now;
for (long rotationTime : rotationTimes) {
if (nowTod < rotationTime) {
next = rotationTime-nowTod + now;
break;
}
}
Expand All @@ -197,7 +207,7 @@ void waitDrained() {
} catch (InterruptedException e) {
}
}
super.flush();
flush();
}

private void checkAndCreateDir(String pathname) {
Expand All @@ -220,21 +230,22 @@ void rotateNow () {

// Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as
// isInterrupted() returns false after interruption in p.waitFor
private void internalRotateNow() throws InterruptedException {
private void internalRotateNow() {
// figure out new file name, then
// use super.setOutputStream to switch to a new file

String oldFileName = fileName;
long now = System.currentTimeMillis();
fileName = LogFormatter.insertDate(filePattern, now);
super.flush();
flush();
super.close();

try {
checkAndCreateDir(fileName);
FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety
super.setOutputStream(os);
currentOutputStream = os;
lastDropPosition = 0;
LogFileDb.nowLoggingTo(fileName);
}
catch (IOException e) {
Expand All @@ -243,37 +254,37 @@ private void internalRotateNow() throws InterruptedException {

createSymlinkToCurrentFile();

numberOfRecords = 0;
lastRotationTime = now;
nextRotationTime = 0; //figure it out later (lazy evaluation)
if ((oldFileName != null)) {
File oldFile = new File(oldFileName);
if (oldFile.exists()) {
if (compressOnRotation) {
executor.execute(() -> runCompression(oldFile));
} else {
NativeIO nativeIO = new NativeIO();
nativeIO.dropFileFromCache(oldFile);
}
}
}
}

private void runCompression(File oldFile) {

static void runCompression(File oldFile) {
File gzippedFile = new File(oldFile.getPath() + ".gz");
try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000);
FileInputStream inputStream = new FileInputStream(oldFile))
{
byte [] buffer = new byte[0x100000];
byte [] buffer = new byte[0x400000]; // 4M buffer

long totalBytesRead = 0;
NativeIO nativeIO = new NativeIO();
for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) {
compressor.write(buffer, 0, read);
nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false);
totalBytesRead += read;
}
compressor.finish();
compressor.flush();

NativeIO nativeIO = new NativeIO();
nativeIO.dropFileFromCache(oldFile); // Drop from cache in case somebody else has a reference to it preventing from dying quickly.
oldFile.delete();
nativeIO.dropFileFromCache(gzippedFile);
} catch (IOException e) {
Expand Down Expand Up @@ -306,28 +317,6 @@ private void createSymlinkToCurrentFile() {
}
}

/**
* Name the current file to "name.n" where n
* 1+ the largest integer in existing file names
*/
private void moveCurrentFile() {
File file=new File(fileName);
if ( ! file.exists()) return; // no current file
File dir=file.getParentFile();
Pattern logFilePattern=Pattern.compile(".*\\.(\\d+)");
long largestN=0;
for (File existingFile : dir.listFiles()) {
Matcher matcher=logFilePattern.matcher(existingFile.getName());
if (!matcher.matches()) continue;
long thisN=Long.parseLong(matcher.group(1));
if (thisN>largestN)
largestN=thisN;
}
File newFn = new File(dir, file.getName() + "." + (largestN + 1));
LogFileDb.nowLoggingTo(newFn.getAbsolutePath());
file.renameTo(newFn);
}

/**
* Calculate rotation times array, given times in minutes, as "0 60 ..."
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.yahoo.container.logging;

import java.io.File;

public class CompressWhileDrop {
public static void main(String [] args) {
System.out.println("Start compressing file " + args[0]);
LogFileHandler.runCompression(new File(args[0]));
}
}
23 changes: 16 additions & 7 deletions vespajlib/src/main/java/com/yahoo/io/NativeIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,28 @@ public Throwable getError() {
}

/**
* Will hint the OS that this is will not be accessed again and should hence be dropped from the buffer cache.
* Will hint the OS that data read so far will not be accessed again and should hence be dropped from the buffer cache.
* @param fd The file descriptor to drop from buffer cache.
*/
public void dropFileFromCache(FileDescriptor fd) {
try {
fd.sync();
} catch (SyncFailedException e) {
logger.warning("Sync failed while dropping cache: " + e.getMessage());
public void dropPartialFileFromCache(FileDescriptor fd, long offset, long len, boolean sync) {
if (sync) {
try {
fd.sync();
} catch (SyncFailedException e) {
logger.warning("Sync failed while dropping cache: " + e.getMessage());
}
}
if (initialized) {
posix_fadvise(getNativeFD(fd), 0, 0, POSIX_FADV_DONTNEED);
posix_fadvise(getNativeFD(fd), offset, len, POSIX_FADV_DONTNEED);
}
}
/**
* Will hint the OS that this is will not be accessed again and should hence be dropped from the buffer cache.
* @param fd The file descriptor to drop from buffer cache.
*/
public void dropFileFromCache(FileDescriptor fd) {
dropPartialFileFromCache(fd, 0, 0, true);
}

/**
* Will hint the OS that this is will not be accessed again and should hence be dropped from the buffer cache.
Expand Down

0 comments on commit f36d37d

Please sign in to comment.