Skip to content

Commit

Permalink
Format java files in o.e.equinox.log.stream
Browse files Browse the repository at this point in the history
This was achieved by running:
eclipse -consolelog -nosplash -application org.eclipse.jdt.core.JavaCodeFormatter \
  -config .settings/org.eclipse.jdt.core.prefs . -data `mktemp -d`

Signed-off-by: Torbjörn SVENSSON <[email protected]>
  • Loading branch information
Torbjorn-Svensson authored and vogella committed Oct 13, 2023
1 parent c165f67 commit c4abc44
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ public void setLogStream(PushStream<LogEntry> logStream) {
this.logStream = logStream;
}

/* Open method isused to connect to the source and begin receiving a stream of events.
* It returns an AutoCloseable which can be used to close the event stream.
* If the close method is called on this object then the stream is terminated by sending a close event.
* (non-Javadoc)
* @see org.osgi.util.pushstream.PushEventSource#open(org.osgi.util.pushstream.PushEventConsumer)
/*
* Open method isused to connect to the source and begin receiving a stream of
* events. It returns an AutoCloseable which can be used to close the event
* stream. If the close method is called on this object then the stream is
* terminated by sending a close event. (non-Javadoc)
*
* @see org.osgi.util.pushstream.PushEventSource#open(org.osgi.util.pushstream.
* PushEventConsumer)
*/

@Override
Expand All @@ -63,8 +66,9 @@ public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception
throw new IllegalStateException("Cannot add the same consumer multiple times"); //$NON-NLS-1$
}

/*when history is not equal to null then we acquire a lock to provide the full history
* to the consumer first before any other new entries
/*
* when history is not equal to null then we acquire a lock to provide the full
* history to the consumer first before any other new entries
*/
if (withHistory != null) {
historyLock.lock();
Expand All @@ -79,7 +83,7 @@ public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception
historyList.add(e.nextElement());
}
}
//Logging the history in the order of their appearance
// Logging the history in the order of their appearance
if (historyList != null) {
while (!historyList.isEmpty()) {
LogEntry logEntry = historyList.removeLast();
Expand Down Expand Up @@ -110,10 +114,14 @@ public void logged(LogEntry entry) {
historyLock.lock();
}

/*consumer accepts the incoming log entries and returns a back pressure.
* A return of zero indicates that event delivery may continue immediately.
* A positive return value indicates that the source should delay sending any further events for the requested number of milliseconds.
* A return value of -1 indicates that no further events should be sent and that the stream can be closed.
/*
* consumer accepts the incoming log entries and returns a back pressure. A
* return of zero indicates that event delivery may continue immediately. A
* positive return value indicates that the source should delay sending any
* further events for the requested number of milliseconds. A return value of -1
* indicates that no further events should be sent and that the stream can be
* closed.
*
* @see org.osgi.util.pushstream.PushEventConsumer<T>
*/
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
* and each log entry to the LogStreamProviderFactory.
*
*/
public class LogStreamManager implements BundleActivator, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener {
public class LogStreamManager implements BundleActivator,
ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener {
private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration;
private LogStreamProviderFactory logStreamProviderFactory;
private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
Expand All @@ -41,7 +42,9 @@ public class LogStreamManager implements BundleActivator, ServiceTrackerCustomiz

/*
* (non-Javadoc)
* @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
*
* @see
* org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
*/
@Override
public void start(BundleContext bc) throws Exception {
Expand All @@ -54,7 +57,9 @@ public void start(BundleContext bc) throws Exception {

/*
* (non-Javadoc)
* @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
*
* @see
* org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
*/
@Override
public void stop(BundleContext bundleContext) throws Exception {
Expand All @@ -66,7 +71,9 @@ public void stop(BundleContext bundleContext) throws Exception {

/*
* (non-Javadoc)
* @see org.osgi.util.tracker.ServiceTrackerCustomizer#addingService(org.osgi.framework.ServiceReference)
*
* @see org.osgi.util.tracker.ServiceTrackerCustomizer#addingService(org.osgi.
* framework.ServiceReference)
*/

@Override
Expand All @@ -78,13 +85,17 @@ public AtomicReference<LogReaderService> addingService(ServiceReference<LogReade

/*
* (non-Javadoc)
* @see org.osgi.util.tracker.ServiceTrackerCustomizer#modifiedService(org.osgi.framework.ServiceReference, java.lang.Object)
*
* @see org.osgi.util.tracker.ServiceTrackerCustomizer#modifiedService(org.osgi.
* framework.ServiceReference, java.lang.Object)
*/
@Override
public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef, AtomicReference<LogReaderService> modifiedTracked) {
public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef,
AtomicReference<LogReaderService> modifiedTracked) {
eventProducerLock.lock();
try {
// Check if the currently used reader service is lower ranked that the modified serviceRef
// Check if the currently used reader service is lower ranked that the modified
// serviceRef
ServiceReference<LogReaderService> currentServiceRef = logReaderService.getServiceReference();
if (currentServiceRef == null || modifiedServiceRef.compareTo(currentServiceRef) > 0) {
// The modified service reference is higher ranked than the currently used one;
Expand All @@ -97,7 +108,8 @@ public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRe
}
// remove our listener from the currently used service
if (currentServiceRef != null) {
AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentServiceRef);
AtomicReference<LogReaderService> currentTracked = logReaderService
.getService(currentServiceRef);
if (currentTracked != null) {
LogReaderService currentLogReader = currentTracked.get();
if (currentLogReader != null) {
Expand All @@ -121,10 +133,13 @@ public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRe

/*
* (non-Javadoc)
* @see org.osgi.util.tracker.ServiceTrackerCustomizer#removedService(org.osgi.framework.ServiceReference, java.lang.Object)
*
* @see org.osgi.util.tracker.ServiceTrackerCustomizer#removedService(org.osgi.
* framework.ServiceReference, java.lang.Object)
*/
@Override
public void removedService(ServiceReference<LogReaderService> removedRef, AtomicReference<LogReaderService> removedTracked) {
public void removedService(ServiceReference<LogReaderService> removedRef,
AtomicReference<LogReaderService> removedTracked) {
eventProducerLock.lock();
try {
LogReaderService removedLogReader = removedTracked.get();
Expand Down Expand Up @@ -153,8 +168,10 @@ public void removedService(ServiceReference<LogReaderService> removedRef, Atomic
}
}

/* It is used to post each log entry to the LogStreamProviderFactory
/*
* It is used to post each log entry to the LogStreamProviderFactory
* (non-Javadoc)
*
* @see org.osgi.service.log.LogListener#logged(org.osgi.service.log.LogEntry)
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvide
return t;
});

public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
public LogStreamProviderFactory(
ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
this.logReaderService = logReaderService;
}

/*Reader of providers map
* 1) for each provider
* - post entry to provider
/*
* Reader of providers map 1) for each provider - post entry to provider
*/
public void postLogEntry(LogEntry entry) {
eventProducerLock.readLock().lock();
Expand All @@ -60,12 +60,12 @@ public void postLogEntry(LogEntry entry) {

}

/* Writer to providers map
* 1) create new LogStreamProviderImpl
* 2) put new instance in map
* 3) return new instance
* (non-Javadoc)
* @see org.osgi.framework.ServiceFactory#getService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration)
/*
* Writer to providers map 1) create new LogStreamProviderImpl 2) put new
* instance in map 3) return new instance (non-Javadoc)
*
* @see org.osgi.framework.ServiceFactory#getService(org.osgi.framework.Bundle,
* org.osgi.framework.ServiceRegistration)
*/

@Override
Expand All @@ -80,14 +80,19 @@ public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogSt
}
}

/* 1) Remove the logStreamProviderImpl instance associated with the bundle
* 2) close all existing LogStreams from the provider, outside the write lock
/*
* 1) Remove the logStreamProviderImpl instance associated with the bundle 2)
* close all existing LogStreams from the provider, outside the write lock
* (non-Javadoc)
* @see org.osgi.framework.ServiceFactory#ungetService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration, java.lang.Object)
*
* @see
* org.osgi.framework.ServiceFactory#ungetService(org.osgi.framework.Bundle,
* org.osgi.framework.ServiceRegistration, java.lang.Object)
*/

@Override
public void ungetService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration, LogStreamProvider service) {
public void ungetService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration,
LogStreamProvider service) {

LogStreamProviderImpl logStreamProviderImpl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ public class LogStreamProviderImpl implements LogStreamProvider {
private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
private final ExecutorService executor;

public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService, ExecutorService executor) {
public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService,
ExecutorService executor) {
this.logReaderService = logReaderService;
this.executor = executor;
}

/* Create a PushStream of {@link LogEntry} objects.
* The returned PushStream is
* Buffered with a buffer large enough to contain the history, if included.
* Have the QueuePolicyOption.DISCARD_OLDEST queue policy option.
* Use a shared executor.
* Have a parallelism of one.
* (non-Javadoc)
* @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
/*
* Create a PushStream of {@link LogEntry} objects. The returned PushStream is
* Buffered with a buffer large enough to contain the history, if included. Have
* the QueuePolicyOption.DISCARD_OLDEST queue policy option. Use a shared
* executor. Have a parallelism of one. (non-Javadoc)
*
* @see
* org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.
* log.stream.LogStreamProvider.Options[])
*/
@Override
public PushStream<LogEntry> createStream(Options... options) {
Expand All @@ -65,14 +67,17 @@ public PushStream<LogEntry> createStream(Options... options) {
}
}

// A write lock is acquired in order to add logEntrySource into the Set of logEntrySources.
// A write lock is acquired in order to add logEntrySource into the Set of
// logEntrySources.
historyLock.writeLock().lock();
try {
LogEntrySource logEntrySource = new LogEntrySource(withHistory);
PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
//creating a buffered push stream
PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider
.buildStream(logEntrySource);
// creating a buffered push stream
LinkedBlockingQueue<PushEvent<? extends LogEntry>> historyQueue = new LinkedBlockingQueue<>();
PushStream<LogEntry> logStream = streamBuilder.withBuffer(historyQueue).withExecutor(executor).withParallelism(1).withQueuePolicy(QueuePolicyOption.DISCARD_OLDEST).build();
PushStream<LogEntry> logStream = streamBuilder.withBuffer(historyQueue).withExecutor(executor)
.withParallelism(1).withQueuePolicy(QueuePolicyOption.DISCARD_OLDEST).build();
logEntrySource.setLogStream(logStream);
// Adding to sources makes the source start listening for new entries
logEntrySources.add(logEntrySource);
Expand All @@ -83,7 +88,8 @@ public PushStream<LogEntry> createStream(Options... options) {
}

/*
* Send the incoming log entries to the logEntrySource.logged(entry) for the consumer to accept it.
* Send the incoming log entries to the logEntrySource.logged(entry) for the
* consumer to accept it.
*/
public void logged(LogEntry entry) {
historyLock.readLock().lock();
Expand Down

0 comments on commit c4abc44

Please sign in to comment.