diff --git a/component/pom.xml b/component/pom.xml
index d7a9de06..6a729376 100644
--- a/component/pom.xml
+++ b/component/pom.xml
@@ -94,6 +94,11 @@
siddhi-execution-list
test
+
+ org.wso2.carbon.analytics
+ org.wso2.carbon.si.metrics.core
+
+
diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileArchiveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileArchiveExtension.java
index 106f5d10..fdbf6164 100644
--- a/component/src/main/java/io/siddhi/extension/execution/file/FileArchiveExtension.java
+++ b/component/src/main/java/io/siddhi/extension/execution/file/FileArchiveExtension.java
@@ -30,6 +30,8 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.extension.io.file.metrics.FileArchiveMetrics;
+import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
@@ -39,6 +41,7 @@
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import java.io.BufferedInputStream;
import java.io.File;
@@ -145,6 +148,8 @@ public class FileArchiveExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileArchiveExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
+ private String siddhiAppName;
+ private FileArchiveMetrics fileArchiveMetrics;
@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
@@ -156,6 +161,18 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(
((ConstantExpressionExecutor) attributeExpressionExecutors[3]).getValue().toString());
}
+ siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
+ if (MetricsDataHolder.getInstance().getMetricService() != null &&
+ MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
+ try {
+ if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
+ Constants.PROMETHEUS_REPORTER_NAME)) {
+ fileArchiveMetrics = new FileArchiveMetrics(siddhiAppName);
+ }
+ } catch (IllegalArgumentException e) {
+ log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
+ }
+ }
return null;
}
@@ -201,14 +218,25 @@ protected Object[] process(Object[] data) {
"Exception occurred when creating the subdirectories for the destination directory " +
destinationDirUriObject.getName().getPath(), e);
}
+ if (fileArchiveMetrics != null) {
+ fileArchiveMetrics.setSource(Utils.getShortFilePath(uri));
+ fileArchiveMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
+ fileArchiveMetrics.setType(Utils.getShortFilePath(archiveType));
+ fileArchiveMetrics.setTime(System.currentTimeMillis());
+ }
File sourceFile = new File(uri);
String destinationFile = destinationDirUri + sourceFile.getName();
+
if (archiveType.compareToIgnoreCase(ZIP_FILE_EXTENSION) == 0) {
List fileList = new ArrayList<>();
generateFileList(uri, sourceFile, fileList, excludeSubdirectories);
try {
zip(uri, destinationFile, fileList);
+
} catch (IOException e) {
+ if (fileArchiveMetrics != null) {
+ fileArchiveMetrics.getArchiveMetric(0);
+ }
throw new SiddhiAppRuntimeException("IOException occurred when archiving " + uri, e);
}
} else {
@@ -216,13 +244,20 @@ protected Object[] process(Object[] data) {
if (archiveType.compareToIgnoreCase(TAR_FILE_EXTENSION) == 0) {
addToTarArchiveCompression(
getTarArchiveOutputStream(destinationFile), sourceFile, uri);
+
} else {
throw new SiddhiAppRuntimeException("Unsupported archive type: " + archiveType);
}
} catch (IOException e) {
+ if (fileArchiveMetrics != null) {
+ fileArchiveMetrics.getArchiveMetric(0);
+ }
throw new SiddhiAppRuntimeException("Exception occurred when archiving " + uri, e);
}
}
+ if (fileArchiveMetrics != null) {
+ fileArchiveMetrics.getArchiveMetric(1);
+ }
return new Object[0];
}
diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java
index d0b9ab25..21fb41cd 100644
--- a/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java
+++ b/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java
@@ -31,6 +31,8 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.extension.io.file.metrics.FileCopyMetrics;
+import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
@@ -38,6 +40,7 @@
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.Selectors;
import org.apache.log4j.Logger;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import java.io.File;
import java.util.ArrayList;
@@ -126,6 +129,7 @@ public class FileCopyExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileCopyExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
+ private FileCopyMetrics fileCopyMetrics;
@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
@@ -137,6 +141,18 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(((ConstantExpressionExecutor)
attributeExpressionExecutors[2]).getValue().toString());
}
+ if (MetricsDataHolder.getInstance().getMetricService() != null &&
+ MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
+ try {
+ if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
+ Constants.PROMETHEUS_REPORTER_NAME)) {
+ String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
+ fileCopyMetrics = new FileCopyMetrics(siddhiAppName);
+ }
+ } catch (IllegalArgumentException e) {
+ log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
+ }
+ }
return null;
}
@@ -222,6 +238,11 @@ public void stop() {
private void copyFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern,
FileObject rootSourceFileObject) {
FileObject destinationFileObject = null;
+ if (fileCopyMetrics != null) {
+ fileCopyMetrics.setSource(Utils.getShortFilePath(sourceFileObject.getName().getPath()));
+ fileCopyMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
+ fileCopyMetrics.setTime(System.currentTimeMillis());
+ }
try {
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
@@ -239,7 +260,13 @@ private void copyFileToDestination(FileObject sourceFileObject, String destinati
destinationFileObject.copyFrom(sourceFileObject, Selectors.SELECT_ALL);
destinationFileObject.close();
}
+ if (fileCopyMetrics != null) {
+ fileCopyMetrics.getCopyMetric(1);
+ }
} catch (FileSystemException e) {
+ if (fileCopyMetrics != null) {
+ fileCopyMetrics.getCopyMetric(0);
+ }
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when copying for " +
"file: " + sourceFileObject.getName().getPath(), e);
} finally {
diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileDeleteExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileDeleteExtension.java
index 4194657a..24c43cce 100644
--- a/component/src/main/java/io/siddhi/extension/execution/file/FileDeleteExtension.java
+++ b/component/src/main/java/io/siddhi/extension/execution/file/FileDeleteExtension.java
@@ -28,6 +28,8 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.extension.io.file.metrics.FileDeleteMetrics;
+import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
@@ -35,6 +37,7 @@
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.Selectors;
import org.apache.log4j.Logger;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import java.util.ArrayList;
import java.util.List;
@@ -67,11 +70,24 @@
)
public class FileDeleteExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileDeleteExtension.class);
+ private FileDeleteMetrics fileDeleteMetrics;
@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
ConfigReader configReader, boolean outputExpectsExpiredEvents,
SiddhiQueryContext siddhiQueryContext) {
+ if (MetricsDataHolder.getInstance().getMetricService() != null &&
+ MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
+ try {
+ if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
+ Constants.PROMETHEUS_REPORTER_NAME)) {
+ String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
+ fileDeleteMetrics = new FileDeleteMetrics(siddhiAppName);
+ }
+ } catch (IllegalArgumentException e) {
+ log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
+ }
+ }
return null;
}
@@ -93,10 +109,20 @@ protected Object[] process(Object[] data) {
@Override
protected Object[] process(Object data) {
String fileDeletePathUri = (String) data;
+ if (fileDeleteMetrics != null) {
+ fileDeleteMetrics.setSource(fileDeletePathUri);
+ fileDeleteMetrics.setTime(System.currentTimeMillis());
+ }
try {
FileObject rootFileObject = Utils.getFileObject(fileDeletePathUri);
rootFileObject.delete(Selectors.SELECT_ALL);
+ if (fileDeleteMetrics != null) {
+ fileDeleteMetrics.getDeleteMetric(1);
+ }
} catch (FileSystemException e) {
+ if (fileDeleteMetrics != null) {
+ fileDeleteMetrics.getDeleteMetric(0);
+ }
throw new SiddhiAppRuntimeException("Failure occurred when deleting the file " + fileDeletePathUri, e);
}
return new Object[0];
diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java
index a6e67533..f2e6d9f4 100644
--- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java
+++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java
@@ -31,12 +31,15 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.extension.io.file.metrics.FileMoveMetrics;
+import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import java.io.File;
import java.util.ArrayList;
@@ -124,6 +127,7 @@ public class FileMoveExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileCopyExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
+ private FileMoveMetrics fileMoveMetrics;
@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
@@ -135,6 +139,18 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(((ConstantExpressionExecutor)
attributeExpressionExecutors[2]).getValue().toString());
}
+ if (MetricsDataHolder.getInstance().getMetricService() != null &&
+ MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
+ try {
+ if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
+ Constants.PROMETHEUS_REPORTER_NAME)) {
+ String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
+ fileMoveMetrics = new FileMoveMetrics(siddhiAppName);
+ }
+ } catch (IllegalArgumentException e) {
+ log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
+ }
+ }
return null;
}
@@ -218,6 +234,7 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
FileObject destinationFileObject;
+
if (sourceFileObject.isFile()) {
destinationPath = destinationDirUri + File.separator + sourceFileObject.getName().getBaseName();
destinationFileObject = Utils.getFileObject(destinationPath);
@@ -225,11 +242,23 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati
if (!destinationFolderFileObject.exists()) {
destinationFolderFileObject.createFolder();
}
+ if (fileMoveMetrics != null) {
+ fileMoveMetrics.set_source(Utils.getShortFilePath(sourceFileObject.getName().getPath()));
+ fileMoveMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
+ fileMoveMetrics.setTime(System.currentTimeMillis());
+ }
+
if (pattern.matcher(fileName).lookingAt()) {
sourceFileObject.moveTo(destinationFileObject);
}
+ if (fileMoveMetrics != null) {
+ fileMoveMetrics.getMoveMetric(1);
+ }
}
} catch (FileSystemException e) {
+ if (fileMoveMetrics != null) {
+ fileMoveMetrics.getMoveMetric(0);
+ }
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when moving for file: " +
sourceFileObject.getName().getPath(), e);
}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/FileSink.java b/component/src/main/java/io/siddhi/extension/io/file/FileSink.java
index 32038f56..471f1f68 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/FileSink.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/FileSink.java
@@ -18,6 +18,7 @@
package io.siddhi.extension.io.file;
+import com.google.common.base.Stopwatch;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
@@ -32,11 +33,15 @@
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
+import io.siddhi.extension.io.file.metrics.SinkMetrics;
+import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.io.file.util.Constants;
+import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.file.connector.sender.VFSClientConnector;
import java.io.UnsupportedEncodingException;
@@ -121,6 +126,8 @@ public class FileSink extends Sink {
private Option uriOption;
private SiddhiAppContext siddhiAppContext;
private boolean addEventSeparator;
+ private String siddhiAppName;
+ private SinkMetrics metrics;
@Override
@@ -135,6 +142,7 @@ public String[] getSupportedDynamicOptions() {
protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder,
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
this.siddhiAppContext = siddhiAppContext;
+ this.siddhiAppName = siddhiAppContext.getName();
uriOption = optionHolder.validateAndGetOption(Constants.FILE_URI);
String append = optionHolder.validateAndGetStaticValue(Constants.APPEND, Constants.TRUE);
properties = new HashMap<>();
@@ -147,6 +155,18 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
addEventSeparator = optionHolder.isOptionExists(Constants.ADD_EVENT_SEPARATOR) ?
Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(Constants.ADD_EVENT_SEPARATOR)) :
!mapType.equalsIgnoreCase("csv");
+ mapType = Utils.capitalizeFirstLetter(mapType);
+ if (MetricsDataHolder.getInstance().getMetricService() != null &&
+ MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
+ try {
+ if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
+ Constants.PROMETHEUS_REPORTER_NAME)) {
+ metrics = new SinkMetrics(siddhiAppContext.getName(), mapType, streamDefinition.getId());
+ }
+ } catch (IllegalArgumentException e) {
+ log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
+ }
+ }
return null;
}
@@ -157,6 +177,9 @@ protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
public void connect() throws ConnectionUnavailableException {
vfsClientConnector = new VFSClientConnector();
+ if (metrics != null) {
+ metrics.updateMetrics(siddhiAppContext.getExecutorService());
+ }
}
public void disconnect() {
@@ -169,6 +192,10 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state)
throws ConnectionUnavailableException {
byte[] byteArray = new byte[0];
boolean canBeWritten = true;
+ String uri = uriOption.getValue(dynamicOptions);
+ if (metrics != null) {
+ metrics.setFilePath(uri);
+ }
if (payload instanceof byte[]) {
byteArray = (byte[]) payload;
} else {
@@ -182,18 +209,54 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state)
} catch (UnsupportedEncodingException e) {
canBeWritten = false;
log.error("Received payload does not support UTF-8 encoding. Hence dropping the event." , e);
+ if (metrics != null) {
+ metrics.getSinkDroppedEvents().inc();
+ }
}
}
if (canBeWritten) {
- String uri = uriOption.getValue(dynamicOptions);
BinaryCarbonMessage binaryCarbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(byteArray), true);
properties.put(Constants.URI, uri);
+ int byteSize = byteArray.length;
try {
- vfsClientConnector.send(binaryCarbonMessage, null, properties);
+ boolean send = vfsClientConnector.send(binaryCarbonMessage, null, properties);
+ if (metrics == null) {
+ return;
+ }
+ if (send) {
+ siddhiAppContext.getExecutorService().execute(() -> {
+ long fileSize = Utils.getFileSize(uri);
+ metrics.getSinkFilesEventCount().inc();
+ metrics.getSinkDroppedEvents();
+ metrics.getWriteBytes().inc(byteSize);
+ metrics.getSinkFileSize().inc(fileSize);
+ String shortenFilePath = Utils.getShortFilePath(uri);
+ boolean added = metrics.getFilesURI().add(shortenFilePath);
+ if (metrics.getSinkFileLastPublishedTimeMap().containsKey(shortenFilePath)) {
+ metrics.getSinkFileLastPublishedTimeMap().replace(shortenFilePath,
+ System.currentTimeMillis());
+ metrics.getSinkFileStatusMap().replace(shortenFilePath, StreamStatus.PROCESSING);
+ } else {
+ metrics.getSinkFileLastPublishedTimeMap().put(shortenFilePath, System.currentTimeMillis());
+ metrics.getSinkFileStatusMap().put(shortenFilePath, StreamStatus.PROCESSING);
+ }
+ metrics.getSinkLinesCount().inc();
+ if (added) {
+ metrics.getSinkElapsedTimeMap().put(shortenFilePath, Stopwatch.createStarted());
+ metrics.setSinkLastPublishedTime();
+ metrics.setSinkElapsedTime(shortenFilePath);
+ metrics.setSinkFileStatusMetrics();
+ }
+ });
+ }
} catch (ClientConnectorException e) {
+ if (metrics != null) {
+ metrics.getSinkFileStatusMap().replace(Utils.getShortFilePath(uri), StreamStatus.RETRY);
+ metrics.getSinkDroppedEvents().inc();
+ }
throw new ConnectionUnavailableException("Writing data into the file " + uri + " failed during the " +
- "execution of '" + siddhiAppContext.getName() + "' SiddhiApp, due to " +
+ "execution of '" + siddhiAppName + "' SiddhiApp, due to " +
e.getMessage(), e);
}
}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java
index 71f918f6..d4dae262 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java
@@ -34,6 +34,8 @@
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.file.listeners.FileSystemListener;
+import io.siddhi.extension.io.file.metrics.SourceMetrics;
+import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.io.file.processors.FileProcessor;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
@@ -50,6 +52,7 @@
import org.wso2.carbon.messaging.ServerConnector;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.carbon.messaging.exceptions.ServerConnectorException;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
import org.wso2.transport.file.connector.sender.VFSClientConnector;
import org.wso2.transport.file.connector.server.FileServerConnector;
import org.wso2.transport.file.connector.server.FileServerConnectorProvider;
@@ -363,6 +366,7 @@ public class FileSource extends Source {
private ConnectionCallback connectionCallback;
private String headerPresent;
private String readOnlyHeader;
+ private SourceMetrics metrics;
private String cronExpression;
@Override
@@ -493,7 +497,18 @@ public StateFactory init(SourceEventListener sourceEventListene
createInitialSourceConf();
updateSourceConf();
getPattern();
-
+ if (MetricsDataHolder.getInstance().getMetricService() != null &&
+ MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
+ try {
+ if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
+ Constants.PROMETHEUS_REPORTER_NAME)) {
+ metrics = new SourceMetrics(siddhiAppContext.getName(), Utils.capitalizeFirstLetter(mode),
+ sourceEventListener.getStreamDefinition().getId());
+ }
+ } catch (IllegalArgumentException e) {
+ log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
+ }
+ }
return () -> new FileSourceState();
}
@@ -506,6 +521,9 @@ public Class[] getOutputEventClasses() {
public void connect(ConnectionCallback connectionCallback, FileSourceState fileSourceState)
throws ConnectionUnavailableException {
this.connectionCallback = connectionCallback;
+ if (metrics != null) {
+ metrics.updateMetrics(siddhiAppContext.getExecutorService());
+ }
updateSourceConf();
deployServers();
}
@@ -684,7 +702,7 @@ private void deployServers() throws ConnectionUnavailableException {
if (dirUri != null) {
Map properties = getFileSystemServerProperties();
FileSystemListener fileSystemListener = new FileSystemListener(sourceEventListener,
- fileSourceConfiguration);
+ fileSourceConfiguration, metrics);
try {
fileSystemServerConnector = fileSystemConnectorFactory.createServerConnector(
siddhiAppContext.getName(), properties, fileSystemListener);
@@ -719,6 +737,13 @@ private void deployServers() throws ConnectionUnavailableException {
if (moveAfterFailure != null) {
properties.put(Constants.MOVE_AFTER_FAILURE_KEY, moveAfterFailure);
}
+ if (metrics != null) {
+ fileSourceConfiguration.setCurrentlyReadingFileURI(fileUri);
+ metrics.setFilePath(fileUri);
+ metrics.getSourceFileStatusMap().putIfAbsent((Utils.getShortFilePath(fileUri)),
+ StreamStatus.CONNECTING);
+ metrics.getStartedTimeMetric(System.currentTimeMillis());
+ }
if (fileSourceConfiguration.isTailingEnabled()) {
if (fileSourceConfiguration.getTailedFileURIMap() == null) {
fileSourceConfiguration.setTailedFileURI(fileUri);
@@ -729,7 +754,7 @@ private void deployServers() throws ConnectionUnavailableException {
FileServerConnectorProvider fileServerConnectorProvider =
fileSourceServiceProvider.getFileServerConnectorProvider();
FileProcessor fileProcessor = new FileProcessor(sourceEventListener,
- fileSourceConfiguration);
+ fileSourceConfiguration, metrics);
final ServerConnector fileServerConnector = fileServerConnectorProvider
.createConnector("file-server-connector", properties);
fileServerConnector.setMessageProcessor(fileProcessor);
@@ -745,6 +770,10 @@ private void deployServers() throws ConnectionUnavailableException {
};
fileSourceConfiguration.getExecutorService().execute(runnableServer);
this.fileServerConnectorStarted = true;
+ if (metrics != null) {
+ metrics.getTailEnabledFilesMap().putIfAbsent(Utils.getShortFilePath(fileUri),
+ System.currentTimeMillis());
+ }
}
} else {
properties.put(Constants.URI, fileUri);
@@ -753,7 +782,8 @@ private void deployServers() throws ConnectionUnavailableException {
properties.put(Constants.HEADER_PRESENT, headerPresent);
properties.put(Constants.READ_ONLY_HEADER, readOnlyHeader);
VFSClientConnector vfsClientConnector = new VFSClientConnector();
- FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration);
+ FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration,
+ metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);
VFSClientConnectorCallback vfsClientConnectorCallback = new VFSClientConnectorCallback();
Runnable runnableClient = () -> {
@@ -768,6 +798,23 @@ private void deployServers() throws ConnectionUnavailableException {
}
vfsClientConnector.send(null, vfsClientConnectorCallback, properties);
vfsClientConnectorCallback.waitTillDone(timeout, fileUri);
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileUri),
+ StreamStatus.COMPLETED);
+ if (actionAfterProcess.equals(Constants.DELETE)) {
+ metrics.getFileDeleteMetrics().setSource(Utils.getShortFilePath(fileUri));
+ metrics.getFileDeleteMetrics().setTime(System.currentTimeMillis());
+ metrics.getFileDeleteMetrics().getDeleteMetric(1);
+ } else if (actionAfterProcess.equals(Constants.MOVE)) {
+ metrics.getFileMoveMetrics().setTime(System.currentTimeMillis());
+ metrics.getFileMoveMetrics().set_source(Utils.getShortFilePath(fileUri));
+ metrics.getFileMoveMetrics().setDestination(Utils.getShortFilePath(
+ moveAfterProcess));
+ metrics.getFileMoveMetrics().getMoveMetric(1);
+ }
+ metrics.setReadPercentage(100);
+ metrics.getCompletedTimeMetric(System.currentTimeMillis());
+ }
}
} catch (ClientConnectorException e) {
log.error(String.format("Failure occurred in vfs-client while reading the file '%s' " +
@@ -780,6 +827,10 @@ private void deployServers() throws ConnectionUnavailableException {
fileSourceConfiguration.getExecutorService().execute(runnableClient);
}
}
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileUri),
+ StreamStatus.PROCESSING);
+ }
}
}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java
index ccb13ab6..06e11398 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java
@@ -132,7 +132,7 @@ public void processFile(String fileURI, JobExecutionContext jobExecutionContext,
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration) dataMap.get(
Constants.FILE_SOURCE_CONFIGURATION);
- FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration);
+ FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, null);
VFSClientConnector vfsClientConnector = new VFSClientConnector();
vfsClientConnector.setMessageProcessor(fileProcessor);
Map properties = generateProperties(fileSourceConfiguration, fileURI);
diff --git a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java
index a1824158..d9cb83a1 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java
@@ -19,11 +19,15 @@
package io.siddhi.extension.io.file.listeners;
import io.siddhi.core.stream.input.source.SourceEventListener;
+import io.siddhi.extension.io.file.metrics.SourceMetrics;
+import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.io.file.processors.FileProcessor;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
import io.siddhi.extension.io.file.util.FileSourceServiceProvider;
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
+import io.siddhi.extension.util.Utils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.CarbonCallback;
@@ -39,6 +43,8 @@
import org.wso2.transport.remotefilesystem.message.RemoteFileSystemEvent;
import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -56,12 +62,14 @@ public class FileSystemListener implements RemoteFileSystemListener {
private SourceEventListener sourceEventListener;
private FileSourceConfiguration fileSourceConfiguration;
private FileSourceServiceProvider fileSourceServiceProvider;
+ private SourceMetrics metrics;
public FileSystemListener(SourceEventListener sourceEventListener,
- FileSourceConfiguration fileSourceConfiguration) {
+ FileSourceConfiguration fileSourceConfiguration, SourceMetrics sourceMetrics) {
this.sourceEventListener = sourceEventListener;
this.fileSourceConfiguration = fileSourceConfiguration;
this.fileSourceServiceProvider = FileSourceServiceProvider.getInstance();
+ this.metrics = sourceMetrics;
}
@Override
@@ -75,9 +83,15 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
String fileURI = a.toURI().toString();
VFSClientConnector vfsClientConnector;
FileProcessor fileProcessor;
+ fileSourceConfiguration.setCurrentlyReadingFileURI(fileURI);
+ String shortenFilePath = Utils.getShortFilePath(fileURI);
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().putIfAbsent(shortenFilePath, StreamStatus.PROCESSING);
+ metrics.setFilePath(fileURI);
+ }
if (Constants.TEXT_FULL.equalsIgnoreCase(mode)) {
vfsClientConnector = new VFSClientConnector();
- fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration);
+ fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);
Map properties = generateProperties(fileSourceConfiguration, fileURI);
VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
@@ -97,10 +111,13 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
} catch (ClientConnectorException e) {
log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), e);
carbonCallback.done(carbonMessage);
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
+ }
}
} else if (Constants.BINARY_FULL.equalsIgnoreCase(mode)) {
vfsClientConnector = new VFSClientConnector();
- fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration);
+ fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);
Map properties = generateProperties(fileSourceConfiguration, fileURI);
VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
@@ -120,18 +137,24 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
}
} catch (ClientConnectorException e) {
log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), e);
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
+ }
}
} else if (Constants.LINE.equalsIgnoreCase(mode) || Constants.REGEX.equalsIgnoreCase(mode)) {
Map properties = generateProperties(fileSourceConfiguration, fileURI);
if (fileSourceConfiguration.isTailingEnabled()) {
fileSourceConfiguration.setTailedFileURI(fileURI);
+ if (metrics != null) {
+ metrics.getTailEnabledFilesMap().putIfAbsent(shortenFilePath,
+ System.currentTimeMillis());
+ }
if (fileSourceConfiguration.getTailedFileURIMap().contains(fileURI)) {
properties.put(Constants.START_POSITION, fileSourceConfiguration.getFilePointer());
properties.put(Constants.PATH, fileURI);
FileServerConnectorProvider fileServerConnectorProvider =
fileSourceServiceProvider.getFileServerConnectorProvider();
- fileProcessor = new FileProcessor(sourceEventListener,
- fileSourceConfiguration);
+ fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
final ServerConnector fileServerConnector = fileServerConnectorProvider
.createConnector("file-server-connector", properties);
fileServerConnector.setMessageProcessor(fileProcessor);
@@ -140,7 +163,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
fileURI.getBytes(StandardCharsets.UTF_8)), true);
FileServerExecutor fileServerExecutor = new FileServerExecutor(carbonMessage,
- carbonCallback, fileServerConnector, fileURI);
+ carbonCallback, fileServerConnector, fileURI, metrics);
if (log.isDebugEnabled()) {
log.debug("fileServerExecutor started with file tailing for file: " + fileURI);
}
@@ -148,7 +171,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
}
} else {
vfsClientConnector = new VFSClientConnector();
- fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration);
+ fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);
VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
@@ -167,6 +190,9 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
}
} catch (ClientConnectorException e) {
log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), e);
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().replace(shortenFilePath, StreamStatus.ERROR);
+ }
}
}
}
@@ -190,13 +216,15 @@ static class FileServerExecutor implements Runnable {
CarbonCallback carbonCallback = null;
CarbonMessage carbonMessage = null;
String fileURI = null;
+ SourceMetrics metrics;
FileServerExecutor(CarbonMessage carbonMessage, CarbonCallback carbonCallback,
- ServerConnector fileServerConnector, String fileURI) {
+ ServerConnector fileServerConnector, String fileURI, SourceMetrics metrics) {
this.fileURI = fileURI;
this.fileServerConnector = fileServerConnector;
this.carbonCallback = carbonCallback;
this.carbonMessage = carbonMessage;
+ this.metrics = metrics;
}
@Override
@@ -207,6 +235,9 @@ public void run() {
log.error(String.format("Failed to start the server for file '%s'. " +
"Hence starting to process next file.", fileURI));
carbonCallback.done(carbonMessage);
+ if (metrics != null) {
+ metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileURI), StreamStatus.ERROR);
+ }
}
}
}
@@ -219,6 +250,9 @@ private void reProcessFile(VFSClientConnector vfsClientConnector,
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
fileUri.getBytes(StandardCharsets.UTF_8)), true);
String moveAfterProcess = fileSourceConfiguration.getMoveAfterProcess();
+ if (metrics != null) {
+ metrics.setFilePath(fileUri);
+ }
try {
if (fileSourceConfiguration.getActionAfterProcess() != null) {
if (fileSourceConfiguration.getMoveAfterProcess() != null) {
@@ -230,11 +264,61 @@ private void reProcessFile(VFSClientConnector vfsClientConnector,
}
vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties);
vfsClientConnectorCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileUri);
+ if (metrics != null) {
+ fileSourceConfiguration.getExecutorService().execute(() -> {
+ metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileUri),
+ StreamStatus.COMPLETED);
+ increaseMetricsAfterProcess(fileSourceConfiguration.getMoveAfterProcess(), 1);
+ });
+ }
}
+
} catch (ClientConnectorException e) {
+ if (metrics != null) {
+ increaseMetricsAfterProcess(fileSourceConfiguration.getMoveAfterProcess(), 0);
+ }
log.error(String.format("Failure occurred in vfs-client while reading the file '%s'.", fileUri), e);
} catch (InterruptedException e) {
log.error(String.format("Failed to get callback from vfs-client for file '%s'.", fileUri), e);
+ } finally {
+ if (metrics != null) {
+ metrics.setFilePath(fileUri);
+ metrics.getCompletedTimeMetric(System.currentTimeMillis());
+ }
+ }
+ }
+
+ private String getFileName(String uri, String protocol) {
+ try {
+ URL url = new URL(String.format("%s%s%s", protocol, File.separator, uri));
+ return FilenameUtils.getName(url.getPath());
+ } catch (MalformedURLException e) {
+ log.error(String.format("Failed to extract file name from the uri '%s'.", uri), e);
+ return null;
+ }
+ }
+
+ private String constructPath(String baseUri, String fileName) {
+ if (baseUri != null && fileName != null) {
+ if (baseUri.endsWith(File.separator)) {
+ return String.format("%s%s", baseUri, fileName);
+ } else {
+ return String.format("%s%s%s", baseUri, File.separator, fileName);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ private void increaseMetricsAfterProcess(String moveAfterProcess, int value) {
+ if (moveAfterProcess == null) {
+ metrics.getFileDeleteMetrics().setTime(System.currentTimeMillis());
+ metrics.getFileDeleteMetrics().getDeleteMetric(value);
+ } else {
+ metrics.getFileMoveMetrics().setDestination(Utils.getShortFilePath(moveAfterProcess));
+ metrics.getFileMoveMetrics().setTime(System.currentTimeMillis());
+ metrics.getFileMoveMetrics().getMoveMetric(value);
}
+ metrics.setReadPercentage(100.0);
}
}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/FileArchiveMetrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileArchiveMetrics.java
new file mode 100644
index 00000000..5defc134
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileArchiveMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.siddhi.extension.io.file.metrics;
+
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsManagement;
+
+/**
+ * Class which is holds the metrics to monitor Archive files operations.
+ */
+public class FileArchiveMetrics extends Metrics {
+
+ private String destination;
+ private String type;
+ protected String source;
+ protected Long time;
+
+ public FileArchiveMetrics(String siddhiAppName) {
+ super(siddhiAppName);
+ }
+
+ public void getArchiveMetric(int status) {
+ MetricsManagement.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Operations.Archived.%s.%s.%s.%s",
+ siddhiAppName, type, time + ".time", source + ".source", destination + ".destination"),
+ Level.INFO, () -> status);
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public void setTime(Long time) {
+ this.time = time;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/FileCopyMetrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileCopyMetrics.java
new file mode 100644
index 00000000..5cf09d95
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileCopyMetrics.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.siddhi.extension.io.file.metrics;
+
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsManagement;
+
+/**
+ * Class which is holds the metrics to monitor Copy file operations.
+ */
+public class FileCopyMetrics extends Metrics {
+
+ private String source;
+ private String destination;
+ private long time;
+
+ public FileCopyMetrics(String siddhiAppName) {
+ super(siddhiAppName);
+ }
+
+ public void getCopyMetric(int status) {
+ MetricsManagement.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Operations.Copy.%s.%s.%s",
+ siddhiAppName, time + ".time", source + ".source", destination + ".destination"),
+ Level.INFO, () -> status);
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/FileDeleteMetrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileDeleteMetrics.java
new file mode 100644
index 00000000..fc8ee35a
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileDeleteMetrics.java
@@ -0,0 +1,47 @@
+package io.siddhi.extension.io.file.metrics;
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsManagement;
+
+/**
+ * Class which is holds the metrics to monitor Delete file operations.
+ */
+public class FileDeleteMetrics extends Metrics {
+
+ private String source;
+ private long time;
+
+ public FileDeleteMetrics(String siddhiAppName) {
+ super(siddhiAppName);
+ }
+
+ public void getDeleteMetric(int status) {
+ MetricsManagement.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Operations.Delete.%s.%s",
+ siddhiAppName, time + ".time", source + ".source"), Level.INFO, () -> status);
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/FileMoveMetrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileMoveMetrics.java
new file mode 100644
index 00000000..b4dc9ee7
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/FileMoveMetrics.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.file.metrics;
+
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsManagement;
+
+/**
+ * Class which is holds the metrics to monitor Move file operations.
+ */
+public class FileMoveMetrics extends Metrics {
+
+ private String source;
+ private String destination;
+ private long time;
+
+ public FileMoveMetrics(String siddhiAppName) {
+ super(siddhiAppName);
+ }
+
+ public void getMoveMetric(int status) {
+ MetricsManagement.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Operations.Move.%s.%s.%s",
+ siddhiAppName, time + ".time", source + ".source", destination + ".destination"), Level.INFO,
+ () -> status);
+ }
+
+ public void set_source(String source) {
+ this.source = source;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/Metrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/Metrics.java
new file mode 100644
index 00000000..592ea1ea
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/Metrics.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.file.metrics;
+
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Parent class of the SinkMetrics, SourceMetrics, File Operation metrics classes.
+ */
+public class Metrics {
+
+ protected Set filesURI;
+ protected Map fileNamesMap; //fileURI, fileName
+ protected String siddhiAppName;
+
+ protected Metrics(String siddhiAppName) {
+ this.siddhiAppName = siddhiAppName;
+ filesURI = new HashSet<>();
+ fileNamesMap = new HashMap<>();
+ MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File", siddhiAppName), Level.INFO).inc();
+ }
+
+ public Set getFilesURI() {
+ return filesURI;
+ }
+
+ public Map getFileNames() {
+ return fileNamesMap;
+ }
+
+}
+
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/SinkMetrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/SinkMetrics.java
new file mode 100644
index 00000000..50da9fb6
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/SinkMetrics.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.file.metrics;
+
+import com.google.common.base.Stopwatch;
+import io.siddhi.extension.util.Utils;
+import org.apache.log4j.Logger;
+import org.wso2.carbon.metrics.core.Counter;
+import org.wso2.carbon.metrics.core.Gauge;
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
+import org.wso2.carbon.si.metrics.core.internal.MetricsManagement;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Class which is holds the metrics to monitor source operations.
+ */
+public class SinkMetrics extends Metrics {
+ private static final Logger log = Logger.getLogger(SinkMetrics.class);
+ private final Map sinkFileStatusMap = new HashMap<>(); // string -> fileURI
+ private final Map sinkFileLastPublishedTimeMap = new HashMap<>();
+ private final Map sinkElapsedTimeMap = new HashMap<>();
+ private boolean isStarted;
+ private String filePath;
+ private final String mapType;
+ private final String streamName;
+ private String fileName;
+
+ public SinkMetrics(String siddhiAppName, String mapType, String streamName) {
+ super(siddhiAppName);
+ this.mapType = mapType;
+ this.streamName = streamName;
+ }
+
+ public Counter getSinkFilesEventCount() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.event.count.%s.%s.%s.%s",
+ siddhiAppName, fileName + ".filename", mapType, streamName, filePath), Level.INFO);
+ }
+
+ public Counter getSinkLinesCount() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "lines_count", filePath), Level.INFO);
+ }
+
+ public Counter getWriteBytes() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "total_written_byte", filePath), Level.INFO);
+ }
+
+ public Counter getSinkDroppedEvents() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "dropped_events", filePath), Level.INFO);
+ }
+
+ public Counter getSinkFileSize() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "file_size", filePath), Level.INFO);
+ }
+
+ public void setSinkLastPublishedTime() {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "last_published_time", filePath),
+ Level.INFO, () -> sinkFileLastPublishedTimeMap.getOrDefault(filePath, 0L));
+ }
+
+ public void setSinkElapsedTime(String fileURI) {
+ MetricsManagement.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "elapsed_time", filePath),
+ Level.INFO, () -> {
+ if (sinkElapsedTimeMap.containsKey(fileURI)) {
+ return sinkElapsedTimeMap.get(fileURI).elapsed().toMillis();
+ } else {
+ return 0;
+ }
+ });
+ }
+
+ public void setSinkFileStatusMetrics() {
+ MetricsManagement.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Sinks.%s.%s",
+ siddhiAppName, "file_status", filePath), Level.INFO, new FileStatusGauge(filePath));
+ }
+
+ public void updateMetrics(ExecutorService executorService) {
+ if (!isStarted) {
+ executorService.execute(() -> {
+ isStarted = true;
+ while (isStarted) {
+ try {
+ if (!sinkFileStatusMap.isEmpty()) {
+ sinkFileLastPublishedTimeMap.forEach((filePath, lastPublishedTime) -> {
+ long idleTime = System.currentTimeMillis() - lastPublishedTime;
+ if (idleTime / 1000 >= 8) {
+ sinkFileStatusMap.replace(filePath, StreamStatus.IDLE);
+ }
+ });
+ }
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ log.error("Error while updating the status of files.", e);
+ }
+ }
+ });
+
+ }
+ }
+
+ public void setFilePath(String fileURI) {
+ this.filePath = Utils.getShortFilePath(fileURI);;
+ this.fileName = Utils.getFileName(fileURI, this);
+
+ }
+
+ public Map getSinkFileStatusMap() {
+ return sinkFileStatusMap;
+ }
+
+ public Map getSinkFileLastPublishedTimeMap() {
+ return sinkFileLastPublishedTimeMap;
+ }
+
+ public Map getSinkElapsedTimeMap() {
+ return sinkElapsedTimeMap;
+ }
+
+ /**
+ * Class which is used to get the status of the file.
+ */
+ public class FileStatusGauge implements Gauge {
+ private final String filePath;
+
+ public FileStatusGauge(String filePath) {
+ this.filePath = Utils.getShortFilePath(filePath);
+ }
+
+
+ @Override
+ public Integer getValue() {
+ if (sinkFileStatusMap.containsKey(filePath)) {
+ return sinkFileStatusMap.get(filePath).ordinal();
+ } else {
+ return 0;
+ }
+ }
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/SourceMetrics.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/SourceMetrics.java
new file mode 100644
index 00000000..0f2463dd
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/SourceMetrics.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.siddhi.extension.io.file.metrics;
+
+import io.siddhi.extension.util.Utils;
+import org.apache.log4j.Logger;
+import org.wso2.carbon.metrics.core.Counter;
+import org.wso2.carbon.metrics.core.Gauge;
+import org.wso2.carbon.metrics.core.Level;
+import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Class which is holds the metrics to monitor sink operations.
+ */
+public class SourceMetrics extends Metrics {
+ private static final Logger log = Logger.getLogger(SourceMetrics.class);
+ private final Map sourceFileStatusMap = new HashMap<>();
+ private final Map lastConsumedTimeMap = new HashMap<>(); //to get the last consumed time
+
+ private boolean isStarted;
+ private String filePath;
+ private String fileName;
+ private final String readingMode;
+ private final String streamName;
+ private final FileDeleteMetrics fileDeleteMetrics;
+ private final FileMoveMetrics fileMoveMetrics;
+ private double readPercentage;
+
+ public SourceMetrics(String siddhiAppName, String readingMode, String streamName) {
+ super(siddhiAppName);
+ this.readingMode = readingMode;
+ this.streamName = streamName;
+ this.fileDeleteMetrics = new FileDeleteMetrics(siddhiAppName);
+ this.fileMoveMetrics = new FileMoveMetrics(siddhiAppName);
+ }
+
+ public Counter getSourceFileEventCountMetric() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.event.count.%s.%s.%s.%s",
+ siddhiAppName, fileName + ".filename", readingMode, streamName, filePath), Level.INFO);
+ }
+
+ public Counter getReadByteMetric() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "total_read_byte", filePath), Level.INFO);
+ }
+
+ public Counter getReadLineCountMetric() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "lines_count", filePath), Level.INFO);
+ }
+
+ public void getElapseTimeMetric(Gauge gauge) {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "elapse_time", filePath), Level.INFO, gauge);
+ }
+
+ public Counter getDroppedEventCountMetric() {
+ return MetricsDataHolder.getInstance().getMetricService()
+ .counter(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "dropped_events", filePath), Level.INFO);
+ }
+
+
+ public void getFileSizeMetric(Gauge gauge) {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "file_size", filePath), Level.INFO, gauge);
+ }
+
+ public void getFileStatusMetric() {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "file_status", filePath), Level.INFO, new FileStatusGauge(filePath));
+ }
+
+ public void getStartedTimeMetric(long startTime) {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "started_time", filePath), Level.INFO, () -> startTime);
+ }
+
+ public void getCompletedTimeMetric(long completedTime) {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "completed_time", filePath), Level.INFO, () -> completedTime);
+ }
+
+ public void getTailEnabledMetric(int enable) {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "tailing_enable", filePath), Level.INFO, () -> enable);
+ }
+
+ public void getReadPercentageMetric() {
+ MetricsDataHolder.getInstance().getMetricService()
+ .gauge(String.format("io.siddhi.SiddhiApps.%s.Siddhi.File.Source.%s.%s",
+ siddhiAppName, "read_percentage", filePath), Level.INFO, () -> readPercentage);
+ }
+
+ public void setFilePath(String fileURI) {
+ this.filePath = Utils.getShortFilePath(fileURI);
+ this.fileName = Utils.getFileName(fileURI, this);
+ this.fileMoveMetrics.set_source(filePath);
+ this.fileDeleteMetrics.setSource(filePath);
+
+ }
+
+ public void updateMetrics(ExecutorService executorService) {
+ if (!isStarted) {
+ executorService.execute(() -> {
+ isStarted = true;
+ while (isStarted) {
+ try {
+ if (!lastConsumedTimeMap.isEmpty()) {
+ lastConsumedTimeMap.forEach((filePath, lastModifiedTime) -> {
+ long idleTime = System.currentTimeMillis() - lastModifiedTime;
+ if (idleTime / 1000 >= 8) {
+ sourceFileStatusMap.replace(filePath, StreamStatus.IDLE);
+ }
+ });
+ }
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ log.error("Error while updating the status of files.", e);
+ }
+ }
+ });
+ }
+ }
+
+ public FileDeleteMetrics getFileDeleteMetrics() {
+ return fileDeleteMetrics;
+ }
+
+ public FileMoveMetrics getFileMoveMetrics() {
+ return fileMoveMetrics;
+ }
+
+ public Map getSourceFileStatusMap() {
+ return sourceFileStatusMap;
+ }
+
+ public Map getTailEnabledFilesMap() {
+ return lastConsumedTimeMap;
+ }
+
+ public void setReadPercentage(double readPercentage) {
+ this.readPercentage = readPercentage;
+ }
+
+ /**
+ * Gauge implementation to get the status of the file.
+ */
+ private class FileStatusGauge implements Gauge {
+
+ private final String fileURI;
+
+ private FileStatusGauge(String fileURI) {
+ this.fileURI = fileURI;
+ }
+
+ @Override
+ public Integer getValue() {
+ if (sourceFileStatusMap.containsKey(fileURI)) {
+ return sourceFileStatusMap.get(fileURI).ordinal();
+ }
+ return 0;
+ }
+ }
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/metrics/StreamStatus.java b/component/src/main/java/io/siddhi/extension/io/file/metrics/StreamStatus.java
new file mode 100644
index 00000000..bc2c5437
--- /dev/null
+++ b/component/src/main/java/io/siddhi/extension/io/file/metrics/StreamStatus.java
@@ -0,0 +1,13 @@
+package io.siddhi.extension.io.file.metrics;
+
+/**
+ * Enum that defines stream status.
+ */
+public enum StreamStatus {
+ CONNECTING,
+ PROCESSING,
+ COMPLETED,
+ IDLE,
+ RETRY,
+ ERROR,
+}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/processors/FileProcessor.java b/component/src/main/java/io/siddhi/extension/io/file/processors/FileProcessor.java
index 000a3e86..961de0d1 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/processors/FileProcessor.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/processors/FileProcessor.java
@@ -18,9 +18,13 @@
package io.siddhi.extension.io.file.processors;
+import com.google.common.base.Stopwatch;
import io.siddhi.core.stream.input.source.SourceEventListener;
+import io.siddhi.extension.io.file.metrics.SourceMetrics;
+import io.siddhi.extension.io.file.metrics.StreamStatus;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
+import io.siddhi.extension.util.Utils;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.CarbonCallback;
@@ -31,6 +35,7 @@
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
@@ -50,7 +55,19 @@ public class FileProcessor implements CarbonMessageProcessor {
private StringBuilder sb;
private String[] requiredProperties;
- public FileProcessor(SourceEventListener sourceEventListener, FileSourceConfiguration fileSourceConfiguration) {
+ private Stopwatch stopwatch;
+ private long lineCount;
+ private long readingLine;
+ private long totalReadByteSize;
+ private double fileSize;
+ private String fileURI;
+ private SourceMetrics metrics;
+ private long startedTime;
+ private long completedTime;
+ private boolean send;
+
+ public FileProcessor(SourceEventListener sourceEventListener, FileSourceConfiguration fileSourceConfiguration,
+ SourceMetrics sourceMetrics) {
this.sourceEventListener = sourceEventListener;
this.fileSourceConfiguration = fileSourceConfiguration;
this.requiredProperties = fileSourceConfiguration.getRequiredProperties();
@@ -61,6 +78,41 @@ public FileProcessor(SourceEventListener sourceEventListener, FileSourceConfigur
sb = new StringBuilder();
}
pattern = fileSourceConfiguration.getPattern();
+ if (sourceMetrics != null) {
+ this.metrics = sourceMetrics;
+ this.fileURI = fileSourceConfiguration.getCurrentlyReadingFileURI();
+ fileSourceConfiguration.getExecutorService().execute(() -> {
+ stopwatch = Stopwatch.createStarted();
+ startedTime = System.currentTimeMillis();
+ fileSize = Utils.getFileSize(fileURI); //converts into KB
+ metrics.getStartedTimeMetric(System.currentTimeMillis());
+ boolean add = metrics.getFilesURI().add(fileURI);
+ if (add) {
+ try {
+ lineCount = Utils.getLinesCount(fileURI);
+ metrics.getFileSizeMetric(() -> fileSize);
+ metrics.getReadPercentageMetric();
+ metrics.getReadLineCountMetric().inc(lineCount);
+ metrics.getDroppedEventCountMetric();
+ if (fileSourceConfiguration.isTailingEnabled()) {
+ metrics.getTailEnabledMetric(1);
+ metrics.getElapseTimeMetric(() -> stopwatch.elapsed().toMillis());
+ } else {
+ metrics.getTailEnabledMetric(0);
+ metrics.getElapseTimeMetric(() -> {
+ if (completedTime != 0) {
+ return completedTime - startedTime;
+ }
+ return 0;
+ });
+ }
+ metrics.getFileStatusMetric();
+ } catch (IOException e) {
+ log.error("Error occurred while getting the lines count in '" + fileURI + "'.", e);
+ }
+ }
+ });
+ }
}
public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallback) throws Exception {
@@ -74,11 +126,13 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
(org.wso2.transport.file.connector.server.util.Constants.EOF, true);
sourceEventListener.onEvent(new String(content, Constants.UTF_8),
getRequiredPropertyValues(carbonMessage));
+ send = true;
}
} else if (Constants.BINARY_FULL.equalsIgnoreCase(mode)) {
if (msg.length() > 0) {
carbonCallback.done(carbonMessage);
sourceEventListener.onEvent(content, getRequiredPropertyValues(carbonMessage));
+ send = true;
}
} else if (Constants.LINE.equalsIgnoreCase(mode)) {
if (!fileSourceConfiguration.isTailingEnabled()) {
@@ -89,6 +143,7 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
if (line.length() > 0) {
readBytes = line.length();
sourceEventListener.onEvent(line.trim(), getRequiredPropertyValues(carbonMessage));
+ send = true;
}
}
carbonCallback.done(carbonMessage);
@@ -98,6 +153,10 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
fileSourceConfiguration.updateFilePointer(
(Long) carbonMessage.getProperties().get(Constants.CURRENT_POSITION));
sourceEventListener.onEvent(msg, getRequiredPropertyValues(carbonMessage));
+ send = true;
+ if (metrics != null) {
+ increaseTailingMetrics();
+ }
}
}
} else if (Constants.REGEX.equalsIgnoreCase(mode)) {
@@ -141,15 +200,18 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
carbonMessage.setProperty
(org.wso2.transport.file.connector.server.util.Constants.EOF, false);
sourceEventListener.onEvent(prevEvent, getRequiredPropertyValues(carbonMessage));
+ send = true;
}
carbonMessage.setProperty
(org.wso2.transport.file.connector.server.util.Constants.EOF, true);
sourceEventListener.onEvent(event, getRequiredPropertyValues(carbonMessage));
+ send = true;
} else if (matchFound) {
if (prevEvent != null) {
carbonMessage.setProperty
(org.wso2.transport.file.connector.server.util.Constants.EOF, false);
sourceEventListener.onEvent(prevEvent, getRequiredPropertyValues(carbonMessage));
+ send = true;
}
prevEvent = event;
} else {
@@ -160,6 +222,7 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
carbonMessage.setProperty
(org.wso2.transport.file.connector.server.util.Constants.EOF, true);
sourceEventListener.onEvent(prevEvent, getRequiredPropertyValues(carbonMessage));
+ send = true;
}
}
}
@@ -181,6 +244,7 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
event = m.group(0);
sourceEventListener.onEvent
(sb.substring(sb.indexOf(event)), getRequiredPropertyValues(carbonMessage));
+ send = true;
}
}
if (carbonCallback != null) {
@@ -204,7 +268,11 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
remainedLength = sb.length() - event.length() - remainedLength - 1;
}
sourceEventListener.onEvent(event, getRequiredPropertyValues(carbonMessage));
+ send = true;
readBytes += content.length;
+ if (metrics != null) {
+ increaseTailingMetrics();
+ }
}
String tmp;
tmp = sb.substring(lastMatchedIndex);
@@ -216,8 +284,18 @@ public boolean receive(CarbonMessage carbonMessage, CarbonCallback carbonCallbac
}
}
}
+ if (metrics != null && send) {
+ increaseMetrics(content.length);
+ totalReadByteSize += content.length;
+ readingLine++;
+ completedTime = System.currentTimeMillis();
+ send = false;
+ }
return true;
} else {
+ if (metrics != null) {
+ metrics.getDroppedEventCountMetric().inc();
+ }
return false;
}
}
@@ -247,4 +325,21 @@ private String[] getRequiredPropertyValues(CarbonMessage carbonMessage) {
}
return values;
}
+
+ private void increaseMetrics(int byteLength) {
+ metrics.getSourceFileEventCountMetric().inc();
+ metrics.getReadByteMetric().inc(byteLength);
+ metrics.getElapseTimeMetric(() -> stopwatch.elapsed().toMillis());
+ metrics.setReadPercentage(totalReadByteSize / fileSize * 100);
+ }
+
+ private void increaseTailingMetrics() {
+ if (readingLine >= lineCount) {
+ metrics.getReadLineCountMetric().inc();
+ }
+ fileSize = Utils.getFileSize(fileSourceConfiguration.getCurrentlyReadingFileURI());
+ metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileURI), StreamStatus.PROCESSING);
+ metrics.getTailEnabledFilesMap().replace(Utils.getShortFilePath(fileURI), System.currentTimeMillis());
+ }
+
}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java
index c11acb7d..e55b64b7 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java
@@ -111,4 +111,7 @@ public class Constants {
public static final String LAST_MODIFIED_TIMESTAMP = "lastModifiedTimestamp";
public static final String UTF_8 = "UTF-8";
+
+ /*prometheus reporte values*/
+ public static final String PROMETHEUS_REPORTER_NAME = "prometheus";
}
diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java
index 24426a80..71491b38 100644
--- a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java
+++ b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java
@@ -61,6 +61,9 @@ public class FileSourceConfiguration {
private String uri = null;
private Scheduler scheduler = null;
+ private String currentlyReadingFileURI;
+ // once app is disconnected
+
public FileSourceConfiguration() {
tailingRegexStringBuilder = new StringBuilder();
}
@@ -256,6 +259,14 @@ public void setHeaderPresent(String headerPresent) {
this.headerPresent = headerPresent;
}
+ public String getCurrentlyReadingFileURI() {
+ return currentlyReadingFileURI;
+ }
+
+ public void setCurrentlyReadingFileURI(String currentlyReadingFileURI) {
+ this.currentlyReadingFileURI = currentlyReadingFileURI;
+ }
+
public String getReadOnlyHeader() {
return readOnlyHeader;
}
diff --git a/component/src/main/java/io/siddhi/extension/util/Utils.java b/component/src/main/java/io/siddhi/extension/util/Utils.java
index 7aa89550..2048dc0e 100644
--- a/component/src/main/java/io/siddhi/extension/util/Utils.java
+++ b/component/src/main/java/io/siddhi/extension/util/Utils.java
@@ -19,6 +19,7 @@
package io.siddhi.extension.util;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
+import io.siddhi.extension.io.file.metrics.Metrics;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemManager;
@@ -30,8 +31,20 @@
import org.wso2.transport.file.connector.server.util.Constants;
import org.wso2.transport.file.connector.server.util.FileTransportUtils;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
@@ -122,4 +135,75 @@ private static void addOptions(String scheme, Map schemeFileOpti
}
}
}
+
+ public static long getFileSize(String filePathUri) {
+ filePathUri = getFilePath(filePathUri);
+ File file = new File(filePathUri);
+ return file.length();
+ }
+
+ private static String getFilePath(String uri) {
+ if (uri.startsWith("file:")) {
+ uri = uri.replaceFirst("file:", "");
+ }
+ return uri.replace("%20", " ");
+ }
+
+ public static long getLinesCount(String uri) throws IOException {
+ if (uri.startsWith("file:")) {
+ uri = uri.replaceFirst("file:", "");
+ }
+ uri = uri.replace("%20", " ");
+ CharsetDecoder dec = StandardCharsets.UTF_8.newDecoder().onMalformedInput(CodingErrorAction.IGNORE);
+ Path path = Paths.get(uri);
+ try (Reader r = Channels.newReader(FileChannel.open(path), dec, -1);
+ BufferedReader br = new BufferedReader(r)) {
+ return br.lines()
+ .filter(line -> line.length() != 0).count();
+ }
+ }
+
+ public static String capitalizeFirstLetter(String str) {
+ return str.substring(0, 1).toUpperCase(Locale.ENGLISH) + str.substring(1);
+ }
+
+ public static String getFileName(String fileURI, Metrics metrics) {
+ fileURI = getFilePath(fileURI);
+ if (metrics.getFileNames().containsKey(fileURI)) {
+ return metrics.getFileNames().get(fileURI);
+ }
+ String[] arr = fileURI.split("/");
+ int n = arr.length;
+ StringBuilder fileName = new StringBuilder();
+ fileName.append(arr[n - 1]);
+ if (!metrics.getFileNames().containsValue(fileName.toString())) {
+ return fileName.toString();
+ }
+ for (int i = n - 2; i >= 0; i--) {
+ fileName.insert(0, "/").insert(0, arr[i]);
+ if (!metrics.getFileNames().containsValue(fileName.toString())) {
+ metrics.getFileNames().put(fileURI, fileName.toString());
+ break;
+ }
+ }
+ return fileName.toString();
+ }
+
+ public static String getShortFilePath(String fileURI) {
+ fileURI = getFilePath(fileURI);
+ if (fileURI.length() <= 40) {
+ return fileURI;
+ }
+ int n = fileURI.length();
+ int i = n - 41; // to get last 40 characters
+ char c = fileURI.charAt(i);
+ while (c != '/' && i > 0) {
+ i--;
+ c = fileURI.charAt(i);
+ }
+ if (i == 0) {
+ return fileURI;
+ }
+ return ".." + fileURI.substring(i);
+ }
}
diff --git a/pom.xml b/pom.xml
index 68d29a34..40446bbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,13 @@
commons-io
${commons.io.version}
+
+
+ org.wso2.carbon.analytics
+ org.wso2.carbon.si.metrics.core
+ ${carbon.analytics.version}
+
+
@@ -152,6 +159,7 @@
3.6
UTF-8
UTF-8
+ 3.0.36