From f55963e252d0b815f982153f5ad632b4f7b07f13 Mon Sep 17 00:00:00 2001 From: Peter Kriens <Peter.Kriens@aqute.biz> Date: Fri, 11 Sep 2020 10:28:30 +0200 Subject: [PATCH] [io] Refactor of ThreadIO The ThreadIO service replaces the System streams so that the output and input on a thread associated with a command can be captured. Unfortunately, this occupies the singletons of System.in/out/err. There are many others that want to use the same mechanism. This change introduces a SystemIO service that dispatches the System streams to any number of listeners. The ThreadIO is modified to use this service. By default, the Gogo runtime will register its own SystemIO service. However, it is possible to disable this with the "org.apache.felix.gogo.systemio.timeout" (SystemIO.TIMEOUT) framework property. If this is set to a numeric value, it will indicate that Gogo should look for an external service. The numeric value indicates the amount of time Gogo should wait for this service. 30000 is a common value. Additionally, there is an onClose method added to the session so that it is possible to get a callback when the session closes. Gogo commands can use this to cancel any activity related to a session. The lambda is weakly referenced so it is necessary that the command keeps a strong reference to the callback. Signed-off-by: Peter Kriens <Peter.Kriens@aqute.biz> --- .gitignore | 1 + gogo/runtime/pom.xml | 28 + .../gogo/runtime/CommandSessionImpl.java | 1631 +++++++++-------- .../gogo/runtime/activator/Activator.java | 47 +- .../gogo/runtime/activator/ServiceFacade.java | 72 + .../gogo/runtime/systemio/DelegateStream.java | 74 + .../gogo/runtime/systemio/SystemIOImpl.java | 138 ++ .../felix/gogo/runtime/threadio/Marker.java | 63 - .../gogo/runtime/threadio/ThreadIOImpl.java | 239 +-- .../runtime/threadio/ThreadInputStream.java | 98 - .../runtime/threadio/ThreadPrintStream.java | 226 --- .../felix/service/command/CommandSession.java | 6 + .../felix/service/systemio/SystemIO.java | 62 + .../felix/service/threadio/ThreadIO.java | 2 + .../gogo/runtime/AbstractParserTest.java | 7 +- .../gogo/runtime/CommandSessionTest.java | 101 + .../felix/gogo/runtime/TestTokenizer.java | 6 +- .../runtime/threadio/TestIOWithFramework.java | 184 ++ .../gogo/runtime/threadio/TestThreadIO.java | 184 +- 19 files changed, 1818 insertions(+), 1351 deletions(-) create mode 100644 gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java create mode 100644 gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java create mode 100644 gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java delete mode 100644 gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java delete mode 100644 gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java delete mode 100644 gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java create mode 100644 gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java create mode 100644 gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java create mode 100644 gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java diff --git a/.gitignore b/.gitignore index 7692143cb7..d00c04e809 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ dependency-reduced-pom.xml /.metadata/ /workspace/ .vscode +gogo/command/.to diff --git a/gogo/runtime/pom.xml b/gogo/runtime/pom.xml index 9737db71bd..42c9f69f6d 100644 --- a/gogo/runtime/pom.xml +++ b/gogo/runtime/pom.xml @@ -85,6 +85,7 @@ org.apache.felix.service.command, org.apache.felix.service.command.annotations, org.apache.felix.service.threadio, + org.apache.felix.service.systemio, </Export-Package> <Import-Package> org.osgi.service.event.*; resolution:=optional, @@ -95,4 +96,31 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>felix</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.framework</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + </profile> + <profile> + <id>equinox</id> + <dependencies> + <dependency> + <groupId>org.eclipse</groupId> + <artifactId>org.eclipse.osgi</artifactId> + <version>3.8.0.v20120529-1548</version> + <scope>test</scope> + </dependency> + </dependencies> + </profile> + </profiles> + </project> diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java index a2271b6605..4ef8a0081d 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.lang.ref.WeakReference; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.nio.channels.Channel; @@ -63,819 +64,845 @@ public class CommandSessionImpl implements CommandSession, Converter { - public static final String SESSION_CLOSED = "session is closed"; - public static final String VARIABLES = ".variables"; - public static final String COMMANDS = ".commands"; - public static final String CONSTANTS = ".constants"; - private static final String COLUMN = "%-20s %s\n"; - - // Streams and channels - protected InputStream in; - protected OutputStream out; - protected PrintStream pout; - protected OutputStream err; - protected PrintStream perr; - protected Channel[] channels; - - private final CommandProcessorImpl processor; - protected final ConcurrentMap<String, Object> variables = new ConcurrentHashMap<>(); - private volatile boolean closed; - private final List<JobImpl> jobs = new ArrayList<>(); - private JobListener jobListener; - - private final ExecutorService executor; - - private Path currentDir; - private ClassLoader classLoader; - - protected CommandSessionImpl(CommandProcessorImpl shell, CommandSessionImpl parent) - { - this.currentDir = parent.currentDir; - this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); - this.processor = shell; - this.channels = parent.channels; - this.in = parent.in; - this.out = parent.out; - this.err = parent.err; - this.pout = parent.pout; - this.perr = parent.perr; - } - - protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, OutputStream out, OutputStream err) - { - this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize(); - this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); - this.processor = shell; - ReadableByteChannel inCh = Channels.newChannel(in); - WritableByteChannel outCh = Channels.newChannel(out); - WritableByteChannel errCh = out == err ? outCh : Channels.newChannel(err); - this.channels = new Channel[] { inCh, outCh, errCh }; - this.in = in; - this.out = out; - this.err = err; - this.pout = out instanceof PrintStream ? (PrintStream) out : new PrintStream(out, true); - this.perr = out == err ? pout : err instanceof PrintStream ? (PrintStream) err : new PrintStream(err, true); - } - - ThreadIO threadIO() - { - return processor.threadIO; - } - - public CommandProcessor processor() - { - return processor; - } - - public ConcurrentMap<String, Object> getVariables() - { - return variables; - } - - public Path currentDir() - { - return currentDir; - } - - public void currentDir(Path path) - { - currentDir = path; - } - - public ClassLoader classLoader() - { - return classLoader != null ? classLoader : getClass().getClassLoader(); - } - - public void classLoader(ClassLoader classLoader) - { - this.classLoader = classLoader; - } - - public void close() - { - if (!this.closed) - { - this.closed = true; - this.processor.closeSession(this); - executor.shutdownNow(); - } - } - - public Object execute(CharSequence commandline) throws Exception - { - assert processor != null; - - if (closed) - { - throw new IllegalStateException(SESSION_CLOSED); - } - - processor.beforeExecute(this, commandline); - - try - { - Closure impl = new Closure(this, null, commandline); - Object result = impl.execute(this, null); - processor.afterExecute(this, commandline, result); - return result; - } - catch (Exception e) - { - processor.afterExecute(this, commandline, e); - throw e; - } - } - - public InputStream getKeyboard() - { - return in; - } - - public Object get(String name) - { - // there is no API to list all variables, so overload name == null - if (name == null || VARIABLES.equals(name)) - { - return Collections.unmodifiableSet(variables.keySet()); - } - - if (COMMANDS.equals(name)) - { - return processor.getCommands(); - } - - if (CONSTANTS.equals(name)) - { - return Collections.unmodifiableSet(processor.constants.keySet()); - } - - Object val = processor.constants.get(name); - if (val != null) - { - return val; - } - - val = variables.get("#" + name); - if (val instanceof Function) - { - try - { - val = ((Function) val).execute(this, null); - } - catch (Exception e) - { - // Ignore - } - return val; - } - else if (val != null) - { - return val; - } - - val = variables.get(name); - if (val != null) - { - return val; - } - - return processor.getCommand(name, variables.get("SCOPE")); - } - - public Object put(String name, Object value) - { - if (value != null) - { - return variables.put(name, value); - } - else - { - return variables.remove(name); - } - } - - public PrintStream getConsole() - { - return pout; - } - - @SuppressWarnings("unchecked") - public CharSequence format(Object target, int level, Converter escape) throws Exception - { - if (target == null) - { - return "null"; - } - - if (target instanceof CharSequence) - { - return (CharSequence) target; - } - - for (Converter c : processor.converters) - { - CharSequence s = c.format(target, level, this); - if (s != null) - { - return s; - } - } - - if (target.getClass().isArray()) - { - if (target.getClass().getComponentType().isPrimitive()) - { - if (target.getClass().getComponentType() == boolean.class) - { - return Arrays.toString((boolean[]) target); - } - else - { - if (target.getClass().getComponentType() == byte.class) - { - return Arrays.toString((byte[]) target); - } - else - { - if (target.getClass().getComponentType() == short.class) - { - return Arrays.toString((short[]) target); - } - else - { - if (target.getClass().getComponentType() == int.class) - { - return Arrays.toString((int[]) target); - } - else - { - if (target.getClass().getComponentType() == long.class) - { - return Arrays.toString((long[]) target); - } - else - { - if (target.getClass().getComponentType() == float.class) - { - return Arrays.toString((float[]) target); - } - else - { - if (target.getClass().getComponentType() == double.class) - { - return Arrays.toString((double[]) target); - } - else - { - if (target.getClass().getComponentType() == char.class) - { - return Arrays.toString((char[]) target); - } - } - } - } - } - } - } - } - } - target = Arrays.asList((Object[]) target); - } - if (target instanceof Collection) - { - if (level == Converter.INSPECT) - { - StringBuilder sb = new StringBuilder(); - Collection<?> c = (Collection<?>) target; - for (Object o : c) - { - sb.append(format(o, level + 1, this)); - sb.append("\n"); - } - return sb; + public static final String SESSION_CLOSED = "session is closed"; + public static final String VARIABLES = ".variables"; + public static final String COMMANDS = ".commands"; + public static final String CONSTANTS = ".constants"; + private static final String COLUMN = "%-20s %s\n"; + + // Streams and channels + protected InputStream in; + protected OutputStream out; + protected PrintStream pout; + protected OutputStream err; + protected PrintStream perr; + protected Channel[] channels; + + private final CommandProcessorImpl processor; + protected final ConcurrentMap<String, Object> variables = new ConcurrentHashMap<>(); + private volatile boolean closed; + private final List<JobImpl> jobs = new ArrayList<>(); + private JobListener jobListener; + private final List<WeakReference<Runnable>> onClose = new ArrayList<>(); + + private final ExecutorService executor; + + private Path currentDir; + private ClassLoader classLoader; + + protected CommandSessionImpl(CommandProcessorImpl shell, CommandSessionImpl parent) + { + this.currentDir = parent.currentDir; + this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); + this.processor = shell; + this.channels = parent.channels; + this.in = parent.in; + this.out = parent.out; + this.err = parent.err; + this.pout = parent.pout; + this.perr = parent.perr; + } + + protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, OutputStream out, OutputStream err) + { + this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize(); + this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); + this.processor = shell; + ReadableByteChannel inCh = Channels.newChannel(in); + WritableByteChannel outCh = Channels.newChannel(out); + WritableByteChannel errCh = out == err ? outCh : Channels.newChannel(err); + this.channels = new Channel[] {inCh, outCh, errCh}; + this.in = in; + this.out = out; + this.err = err; + this.pout = out instanceof PrintStream ? (PrintStream) out : new PrintStream(out, true); + this.perr = out == err ? pout : err instanceof PrintStream ? (PrintStream) err : new PrintStream(err, true); + } + + ThreadIO threadIO() + { + return processor.threadIO; + } + + public CommandProcessor processor() + { + return processor; + } + + public ConcurrentMap<String, Object> getVariables() + { + return variables; + } + + public Path currentDir() + { + return currentDir; + } + + public void currentDir(Path path) + { + currentDir = path; + } + + public ClassLoader classLoader() + { + return classLoader != null ? classLoader : getClass().getClassLoader(); + } + + public void classLoader(ClassLoader classLoader) + { + this.classLoader = classLoader; + } + + public void close() + { + if (!this.closed) + { + for (WeakReference<Runnable> r : onClose) + { + Runnable runnable = r.get(); + if (runnable != null) + try + { + runnable.run(); + } + catch (Exception e) + { + // ignore, best effort + } + } + this.closed = true; + this.processor.closeSession(this); + executor.shutdownNow(); + } + } + + public Object execute(CharSequence commandline) throws Exception + { + assert processor != null; + + if (closed) + { + throw new IllegalStateException(SESSION_CLOSED); + } + + processor.beforeExecute(this, commandline); + + try + { + Closure impl = new Closure(this, null, commandline); + Object result = impl.execute(this, null); + processor.afterExecute(this, commandline, result); + return result; + } + catch (Exception e) + { + processor.afterExecute(this, commandline, e); + throw e; + } + } + + public InputStream getKeyboard() + { + return in; + } + + public Object get(String name) + { + // there is no API to list all variables, so overload name == null + if (name == null || VARIABLES.equals(name)) + { + return Collections.unmodifiableSet(variables.keySet()); + } + + if (COMMANDS.equals(name)) + { + return processor.getCommands(); + } + + if (CONSTANTS.equals(name)) + { + return Collections.unmodifiableSet(processor.constants.keySet()); + } + + Object val = processor.constants.get(name); + if (val != null) + { + return val; + } + + val = variables.get("#" + name); + if (val instanceof Function) + { + try + { + val = ((Function) val).execute(this, null); + } + catch (Exception e) + { + // Ignore + } + return val; + } + else if (val != null) + { + return val; + } + + val = variables.get(name); + if (val != null) + { + return val; + } + + return processor.getCommand(name, variables.get("SCOPE")); + } + + public Object put(String name, Object value) + { + if (value != null) + { + return variables.put(name, value); + } + else + { + return variables.remove(name); + } + } + + public PrintStream getConsole() + { + return pout; + } + + @SuppressWarnings("unchecked") + public CharSequence format(Object target, int level, Converter escape) throws Exception + { + if (target == null) + { + return "null"; + } + + if (target instanceof CharSequence) + { + return (CharSequence) target; + } + + for (Converter c : processor.converters) + { + CharSequence s = c.format(target, level, this); + if (s != null) + { + return s; + } + } + + if (target.getClass().isArray()) + { + if (target.getClass().getComponentType().isPrimitive()) + { + if (target.getClass().getComponentType() == boolean.class) + { + return Arrays.toString((boolean[]) target); } else { - if (level == Converter.LINE) - { - StringBuilder sb = new StringBuilder(); - Collection<?> c = (Collection<?>) target; - sb.append("["); - for (Object o : c) - { - if (sb.length() > 1) + if (target.getClass().getComponentType() == byte.class) + { + return Arrays.toString((byte[]) target); + } + else + { + if (target.getClass().getComponentType() == short.class) + { + return Arrays.toString((short[]) target); + } + else + { + if (target.getClass().getComponentType() == int.class) + { + return Arrays.toString((int[]) target); + } + else + { + if (target.getClass().getComponentType() == long.class) { - sb.append(", "); + return Arrays.toString((long[]) target); } - sb.append(format(o, level + 1, this)); - } - sb.append("]"); - return sb; - } - } - } - if (target instanceof Dictionary) - { - Map<Object, Object> result = new HashMap<>(); - for (Enumeration<Object> e = ((Dictionary<Object, Object>) target).keys(); e.hasMoreElements();) - { - Object key = e.nextElement(); - result.put(key, ((Dictionary<Object, Object>) target).get(key)); - } - target = result; - } - if (target instanceof Map) - { - if (level == Converter.INSPECT) - { - StringBuilder sb = new StringBuilder(); - Map<?, ?> c = (Map<?, ?>) target; - for (Map.Entry<?, ?> entry : c.entrySet()) - { - CharSequence key = format(entry.getKey(), level + 1, this); - sb.append(key); - for (int i = key.length(); i < 20; i++) - { - sb.append(' '); - } - sb.append(format(entry.getValue(), level + 1, this)); - sb.append("\n"); - } - return sb; - } - else - { - if (level == Converter.LINE) - { - StringBuilder sb = new StringBuilder(); - Map<?, ?> c = (Map<?, ?>) target; - sb.append("["); - for (Map.Entry<?, ?> entry : c.entrySet()) - { - if (sb.length() > 1) + else { - sb.append(", "); + if (target.getClass().getComponentType() == float.class) + { + return Arrays.toString((float[]) target); + } + else + { + if (target.getClass().getComponentType() == double.class) + { + return Arrays.toString((double[]) target); + } + else + { + if (target.getClass().getComponentType() == char.class) + { + return Arrays.toString((char[]) target); + } + } + } } - sb.append(format(entry, level + 1, this)); - } - sb.append("]"); - return sb; - } - } - } - if (target instanceof Path) - { - return target.toString(); - } - if (level == Converter.INSPECT) - { - return inspect(target); - } - else - { - return target.toString(); - } - } - - CharSequence inspect(Object b) - { - boolean found = false; - try (Formatter f = new Formatter();) - { - Method methods[] = b.getClass().getMethods(); - for (Method m : methods) - { - try - { - String name = m.getName(); - if (m.getName().startsWith("get") && !m.getName().equals("getClass") && m.getParameterTypes().length == 0 && Modifier.isPublic(m.getModifiers())) - { - found = true; - name = name.substring(3); - m.setAccessible(true); - Object value = m.invoke(b, (Object[]) null); - f.format(COLUMN, name, format(value, Converter.LINE, this)); - } - } - catch (Exception e) - { - // Ignore - } - } - if (found) - { - return (StringBuilder) f.out(); - } - else - { - return b.toString(); - } - } - } - - public Object convert(Class<?> desiredType, Object in) - { - return processor.convert(this, desiredType, in); - } - - public Object doConvert(Class<?> desiredType, Object in) - { - if (desiredType == Class.class) - { + } + } + } + } + } + target = Arrays.asList((Object[]) target); + } + if (target instanceof Collection) + { + if (level == Converter.INSPECT) + { + StringBuilder sb = new StringBuilder(); + Collection<?> c = (Collection<?>) target; + for (Object o : c) + { + sb.append(format(o, level + 1, this)); + sb.append("\n"); + } + return sb; + } + else + { + if (level == Converter.LINE) + { + StringBuilder sb = new StringBuilder(); + Collection<?> c = (Collection<?>) target; + sb.append("["); + for (Object o : c) + { + if (sb.length() > 1) + { + sb.append(", "); + } + sb.append(format(o, level + 1, this)); + } + sb.append("]"); + return sb; + } + } + } + if (target instanceof Dictionary) + { + Map<Object, Object> result = new HashMap<>(); + for (Enumeration<Object> e = ((Dictionary<Object, Object>) target).keys(); e.hasMoreElements();) + { + Object key = e.nextElement(); + result.put(key, ((Dictionary<Object, Object>) target).get(key)); + } + target = result; + } + if (target instanceof Map) + { + if (level == Converter.INSPECT) + { + StringBuilder sb = new StringBuilder(); + Map<?, ?> c = (Map<?, ?>) target; + for (Map.Entry<?, ?> entry : c.entrySet()) + { + CharSequence key = format(entry.getKey(), level + 1, this); + sb.append(key); + for (int i = key.length(); i < 20; i++) + { + sb.append(' '); + } + sb.append(format(entry.getValue(), level + 1, this)); + sb.append("\n"); + } + return sb; + } + else + { + if (level == Converter.LINE) + { + StringBuilder sb = new StringBuilder(); + Map<?, ?> c = (Map<?, ?>) target; + sb.append("["); + for (Map.Entry<?, ?> entry : c.entrySet()) + { + if (sb.length() > 1) + { + sb.append(", "); + } + sb.append(format(entry, level + 1, this)); + } + sb.append("]"); + return sb; + } + } + } + if (target instanceof Path) + { + return target.toString(); + } + if (level == Converter.INSPECT) + { + return inspect(target); + } + else + { + return target.toString(); + } + } + + CharSequence inspect(Object b) + { + boolean found = false; + try (Formatter f = new Formatter();) + { + Method methods[] = b.getClass().getMethods(); + for (Method m : methods) + { try { - return Class.forName(in.toString(), true, classLoader()); - } - catch (ClassNotFoundException e) - { - return null; - } - } - return processor.doConvert(desiredType, in); - } - - public CharSequence format(Object result, int inspect) - { - try - { - return format(result, inspect, this); - } - catch (Exception e) - { - return "<can not format " + result + ":" + e; - } - } - - public Object expr(CharSequence expr) - { - return processor.expr(this, expr); - } - - public Object invoke(Object target, String name, List<Object> args) throws Exception - { - return processor.invoke(this, target, name, args); - } - - public Path redirect(Path path, int mode) - { - return processor.redirect(this, path, mode); - } - - @Override - public List<Job> jobs() - { - synchronized (jobs) - { - return Collections.<Job>unmodifiableList(jobs); - } - } - - public static JobImpl currentJob() - { - return (JobImpl) Job.Utils.current(); - } - - @Override - public JobImpl foregroundJob() - { - List<JobImpl> jobs; - synchronized (this.jobs) - { - jobs = new ArrayList<>(this.jobs); - } - for (JobImpl j : jobs) { - if (j.parent == null && j.status() == Status.Foreground) { - return j; - } - } - return null; - } - - @Override - public void setJobListener(JobListener listener) - { - synchronized (jobs) - { - jobListener = listener; - } - } - - public JobImpl createJob(CharSequence command) - { - synchronized (jobs) - { - int id = 1; - - boolean found; - do - { - found = false; - for (Job job : jobs) - { - if (job.id() == id) - { - found = true; - id++; - break; - } - } - } - while (found); - - JobImpl cur = currentJob(); - JobImpl job = new JobImpl(id, cur, command); - if (cur == null) - { - jobs.add(job); - } - else - { - cur.add(job); - } - return job; - } - } - - class JobImpl implements Job, Runnable - { - private final int id; - private final JobImpl parent; - private final CharSequence command; - private final List<Pipe> pipes = new ArrayList<>(); - private final List<Job> children = new ArrayList<>(); - private Status status = Status.Created; - private Future<?> future; - private Result result; - - public JobImpl(int id, JobImpl parent, CharSequence command) - { - this.id = id; - this.parent = parent; - this.command = command; - } - - void addPipe(Pipe pipe) - { - pipes.add(pipe); - } - - @Override - public int id() - { - return id; - } - - public CharSequence command() - { - return command; - } - - @Override - public synchronized Status status() - { - return status; - } - - @Override - public synchronized void suspend() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - if (status != Status.Suspended) - { - setStatus(Status.Suspended); - } - } - - @Override - public synchronized void background() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - if (status != Status.Background) - { - setStatus(Status.Background); - } - } - - @Override - public synchronized void foreground() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - JobImpl cr = CommandSessionImpl.currentJob(); - JobImpl fg = foregroundJob(); - if (parent == null && fg != null && fg != this && fg != cr) - { - throw new IllegalStateException("A job is already in foreground"); - } - if (status != Status.Foreground) - { - setStatus(Status.Foreground); - } - } - - @Override - public void interrupt() - { - Future<?> future; - List<Job> children; - synchronized (this) - { - future = this.future; - } - synchronized (this.children) - { - children = new ArrayList<>(this.children); - } - for (Job job : children) - { - job.interrupt(); - } - if (future != null) - { - future.cancel(true); - } - } - - protected synchronized void done() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - setStatus(Status.Done); - } - - private void setStatus(Status newStatus) - { - setStatus(newStatus, true); - } - - private void setStatus(Status newStatus, boolean callListeners) - { - Status previous; - synchronized (this) - { - previous = this.status; - status = newStatus; - } - if (callListeners) - { - JobListener listener; - synchronized (jobs) - { - listener = jobListener; - if (newStatus == Status.Done) - { - jobs.remove(this); - } - } - if (listener != null) - { - listener.jobChanged(this, previous, newStatus); - } - } - synchronized (this) - { - JobImpl.this.notifyAll(); - } - } - - @Override - public synchronized Result result() - { - return result; - } - - @Override - public Job parent() - { - return parent; - } - - /** - * Start the job. - * If the job is started in foreground, - * waits for the job to finish or to be - * suspended or moved to background. - * - * @param status the desired job status - * @return <code>null</code> if the job - * has been suspended or moved to background, - * - */ - public synchronized Result start(Status status) throws InterruptedException - { - if (status == Status.Created || status == Status.Done) - { - throw new IllegalArgumentException("Illegal start status"); - } - if (this.status != Status.Created) - { - throw new IllegalStateException("Job already started"); - } - switch (status) - { - case Suspended: - suspend(); - break; - case Background: - background(); - break; - case Foreground: - foreground(); - break; - case Created: - case Done: - } - future = executor.submit(this); - while (this.status == Status.Foreground) - { - JobImpl.this.wait(); - } - return result; - } - - public List<Process> processes() - { - return Collections.unmodifiableList((List<? extends Process>)pipes); - } - - @Override - public CommandSession session() - { - return CommandSessionImpl.this; - } - - public void run() - { - Thread thread = Thread.currentThread(); - String name = thread.getName(); - try - { - thread.setName("job controller " + id); - - List<Callable<Result>> wrapped = new ArrayList<Callable<Result>>(pipes); - List<Future<Result>> results = executor.invokeAll(wrapped); - - // Get pipe exceptions - Exception pipeException = null; - for (int i = 0; i < results.size() - 1; i++) - { - Future<Result> future = results.get(i); - Throwable e; - try - { - Result r = future.get(); - e = r.exception; - } - catch (ExecutionException ee) - { - e = ee.getCause(); - } - if (e != null) - { - if (pipeException == null) - { - pipeException = new Exception("Exception caught during pipe execution"); - } - pipeException.addSuppressed(e); - } - } - put(Closure.PIPE_EXCEPTION, pipeException); - - result = results.get(results.size() - 1).get(); + String name = m.getName(); + if (m.getName().startsWith("get") && !m.getName().equals("getClass") && m.getParameterTypes().length == 0 + && Modifier.isPublic(m.getModifiers())) + { + found = true; + name = name.substring(3); + m.setAccessible(true); + Object value = m.invoke(b, (Object[]) null); + f.format(COLUMN, name, format(value, Converter.LINE, this)); + } } catch (Exception e) { - result = new Result(e); - } - catch (Throwable t) - { - result = new Result(new ExecutionException(t)); - } - finally - { - done(); - thread.setName(name); - } - } - - public void add(Job child) - { - synchronized (children) - { - children.add(child); - } - } - } + // Ignore + } + } + if (found) + { + return (StringBuilder) f.out(); + } + else + { + return b.toString(); + } + } + } + + public Object convert(Class<?> desiredType, Object in) + { + return processor.convert(this, desiredType, in); + } + + public Object doConvert(Class<?> desiredType, Object in) + { + if (desiredType == Class.class) + { + try + { + return Class.forName(in.toString(), true, classLoader()); + } + catch (ClassNotFoundException e) + { + return null; + } + } + return processor.doConvert(desiredType, in); + } + + public CharSequence format(Object result, int inspect) + { + try + { + return format(result, inspect, this); + } + catch (Exception e) + { + return "<can not format " + result + ":" + e; + } + } + + public Object expr(CharSequence expr) + { + return processor.expr(this, expr); + } + + public Object invoke(Object target, String name, List<Object> args) throws Exception + { + return processor.invoke(this, target, name, args); + } + + public Path redirect(Path path, int mode) + { + return processor.redirect(this, path, mode); + } + + @Override + public List<Job> jobs() + { + synchronized (jobs) + { + return Collections.<Job>unmodifiableList(jobs); + } + } + + public static JobImpl currentJob() + { + return (JobImpl) Job.Utils.current(); + } + + @Override + public JobImpl foregroundJob() + { + List<JobImpl> jobs; + synchronized (this.jobs) + { + jobs = new ArrayList<>(this.jobs); + } + for (JobImpl j : jobs) + { + if (j.parent == null && j.status() == Status.Foreground) + { + return j; + } + } + return null; + } + + @Override + public void setJobListener(JobListener listener) + { + synchronized (jobs) + { + jobListener = listener; + } + } + + public JobImpl createJob(CharSequence command) + { + synchronized (jobs) + { + int id = 1; + + boolean found; + do + { + found = false; + for (Job job : jobs) + { + if (job.id() == id) + { + found = true; + id++; + break; + } + } + } + while (found); + + JobImpl cur = currentJob(); + JobImpl job = new JobImpl(id, cur, command); + if (cur == null) + { + jobs.add(job); + } + else + { + cur.add(job); + } + return job; + } + } + + class JobImpl implements Job, Runnable + { + private final int id; + private final JobImpl parent; + private final CharSequence command; + private final List<Pipe> pipes = new ArrayList<>(); + private final List<Job> children = new ArrayList<>(); + private Status status = Status.Created; + private Future<?> future; + private Result result; + + public JobImpl(int id, JobImpl parent, CharSequence command) + { + this.id = id; + this.parent = parent; + this.command = command; + } + + void addPipe(Pipe pipe) + { + pipes.add(pipe); + } + + @Override + public int id() + { + return id; + } + + public CharSequence command() + { + return command; + } + + @Override + public synchronized Status status() + { + return status; + } + + @Override + public synchronized void suspend() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + if (status != Status.Suspended) + { + setStatus(Status.Suspended); + } + } + + @Override + public synchronized void background() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + if (status != Status.Background) + { + setStatus(Status.Background); + } + } + + @Override + public synchronized void foreground() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + JobImpl cr = CommandSessionImpl.currentJob(); + JobImpl fg = foregroundJob(); + if (parent == null && fg != null && fg != this && fg != cr) + { + throw new IllegalStateException("A job is already in foreground"); + } + if (status != Status.Foreground) + { + setStatus(Status.Foreground); + } + } + + @Override + public void interrupt() + { + Future<?> future; + List<Job> children; + synchronized (this) + { + future = this.future; + } + synchronized (this.children) + { + children = new ArrayList<>(this.children); + } + for (Job job : children) + { + job.interrupt(); + } + if (future != null) + { + future.cancel(true); + } + } + + protected synchronized void done() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + setStatus(Status.Done); + } + + private void setStatus(Status newStatus) + { + setStatus(newStatus, true); + } + + private void setStatus(Status newStatus, boolean callListeners) + { + Status previous; + synchronized (this) + { + previous = this.status; + status = newStatus; + } + if (callListeners) + { + JobListener listener; + synchronized (jobs) + { + listener = jobListener; + if (newStatus == Status.Done) + { + jobs.remove(this); + } + } + if (listener != null) + { + listener.jobChanged(this, previous, newStatus); + } + } + synchronized (this) + { + JobImpl.this.notifyAll(); + } + } + + @Override + public synchronized Result result() + { + return result; + } + + @Override + public Job parent() + { + return parent; + } + + /** + * Start the job. If the job is started in foreground, waits for the job to finish or to be suspended or moved to + * background. + * + * @param status the desired job status + * @return <code>null</code> if the job has been suspended or moved to background, + * + */ + public synchronized Result start(Status status) throws InterruptedException + { + if (status == Status.Created || status == Status.Done) + { + throw new IllegalArgumentException("Illegal start status"); + } + if (this.status != Status.Created) + { + throw new IllegalStateException("Job already started"); + } + switch (status) + { + case Suspended: + suspend(); + break; + case Background: + background(); + break; + case Foreground: + foreground(); + break; + case Created: + case Done: + } + future = executor.submit(this); + while (this.status == Status.Foreground) + { + JobImpl.this.wait(); + } + return result; + } + + public List<Process> processes() + { + return Collections.unmodifiableList((List<? extends Process>) pipes); + } + + @Override + public CommandSession session() + { + return CommandSessionImpl.this; + } + + public void run() + { + Thread thread = Thread.currentThread(); + String name = thread.getName(); + try + { + thread.setName("job controller " + id); + + List<Callable<Result>> wrapped = new ArrayList<Callable<Result>>(pipes); + List<Future<Result>> results = executor.invokeAll(wrapped); + + // Get pipe exceptions + Exception pipeException = null; + for (int i = 0; i < results.size() - 1; i++) + { + Future<Result> future = results.get(i); + Throwable e; + try + { + Result r = future.get(); + e = r.exception; + } + catch (ExecutionException ee) + { + e = ee.getCause(); + } + if (e != null) + { + if (pipeException == null) + { + pipeException = new Exception("Exception caught during pipe execution"); + } + pipeException.addSuppressed(e); + } + } + put(Closure.PIPE_EXCEPTION, pipeException); + + result = results.get(results.size() - 1).get(); + } + catch (Exception e) + { + result = new Result(e); + } + catch (Throwable t) + { + result = new Result(new ExecutionException(t)); + } + finally + { + done(); + thread.setName(name); + } + } + + public void add(Job child) + { + synchronized (children) + { + children.add(child); + } + } + } + + /** + * Register a runnable that is run when the session is closed. This should be a quick running method. + * <p> + * The session will maintain a weak reference to this runnable. This means that if you do not hold a reference to it, + * it will be garbage collected to prevent class leaks when bundles are stopped. So make sure to hold a strong reference. + */ + @Override + public void onClose(Runnable runnable) + { + onClose.add(new WeakReference<>(runnable)); + } } diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java index 7f4f2420e0..07f67372dd 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java @@ -18,6 +18,7 @@ */ package org.apache.felix.gogo.runtime.activator; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -27,10 +28,12 @@ import org.apache.felix.gogo.runtime.CommandProcessorImpl; import org.apache.felix.gogo.runtime.CommandProxy; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl; import org.apache.felix.service.command.CommandSessionListener; import org.apache.felix.service.command.CommandProcessor; import org.apache.felix.service.command.Converter; +import org.apache.felix.service.systemio.SystemIO; import org.apache.felix.service.threadio.ThreadIO; import org.osgi.annotation.bundle.Header; import org.osgi.framework.BundleActivator; @@ -46,16 +49,19 @@ public class Activator implements BundleActivator { protected CommandProcessorImpl processor; + private SystemIOImpl systemio; private ThreadIOImpl threadio; private ServiceTracker<?,?> commandTracker; private ServiceTracker<?,?> converterTracker; private ServiceTracker<?,?> listenerTracker; private ServiceRegistration<?> processorRegistration; private ServiceRegistration<?> threadioRegistration; + private ServiceRegistration<?> systemioRegistration; + private ServiceFacade<SystemIO> systemioFacade; public static final String CONTEXT = ".context"; - protected ServiceRegistration<?> newProcessor(ThreadIO tio, BundleContext context) + protected ServiceRegistration<?> newProcessor(ThreadIO tio, SystemIO sio, BundleContext context) { processor = new CommandProcessorImpl(tio); try @@ -78,11 +84,22 @@ protected ServiceRegistration<?> newProcessor(ThreadIO tio, BundleContext contex public void start(final BundleContext context) throws Exception { - threadio = new ThreadIOImpl(); + long timeout = toLong(context.getProperty(SystemIO.TIMEOUT)); + if ( timeout <= 0 ) { + systemio = new SystemIOImpl(); + systemio.start(); + systemioRegistration = context.registerService(SystemIO.class.getName(), systemio, null); + threadio = new ThreadIOImpl(systemio); + } else { + systemioFacade = new ServiceFacade<>(SystemIO.class, context, timeout); + SystemIO systemio = systemioFacade.get(); + threadio = new ThreadIOImpl(systemio); + } threadio.start(); threadioRegistration = context.registerService(ThreadIO.class.getName(), threadio, null); - processorRegistration = newProcessor(threadio, context); + + processorRegistration = newProcessor(threadio, systemio, context); commandTracker = trackOSGiCommands(context); commandTracker.open(); @@ -134,6 +151,19 @@ public void stop(BundleContext context) { listenerTracker.close(); threadio.stop(); processor.stop(); + if( systemioRegistration != null) { + systemioRegistration.unregister(); + systemio.stop(); + } else { + try + { + systemioFacade.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } } private ServiceTracker<?,?> trackOSGiCommands(final BundleContext context) @@ -212,5 +242,16 @@ public void removedService(ServiceReference<Object> reference, List<Object> serv } }; } + + private long toLong(String v) { + if ( v != null) { + try { + return Long.parseLong(v); + } catch( NumberFormatException nfe) { + // ignore + } + } + return Long.MIN_VALUE; + } } diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java new file mode 100644 index 0000000000..61b697c698 --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.activator; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceException; +import org.osgi.util.tracker.ServiceTracker; + +public class ServiceFacade<S> implements Closeable +{ + final ServiceTracker<S,S> tracker; + final S facade; + + @SuppressWarnings("unchecked") + public ServiceFacade(final Class<S> clazz, BundleContext context, final long timeout) + { + this.tracker = new ServiceTracker<>(context, clazz, null); + this.tracker.open(); + this.facade = (S) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] {clazz}, new InvocationHandler() { + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + try + { + S s = tracker.waitForService(timeout); + if ( s == null) + throw new ServiceException( "No such service " + clazz.getName() + ", waited " + timeout + "ms"); + return method.invoke(s, args); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + }); + } + + public S get() { + return facade; + } + + @Override + public void close() throws IOException + { + tracker.close(); + } +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java new file mode 100644 index 0000000000..3026fd3594 --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.systemio; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +class DelegateStream extends OutputStream +{ + + final List<OutputStream> outs; + + + DelegateStream(List<OutputStream> outs) + { + this.outs = outs; + } + + @Override + public void write(int b) throws IOException + { + for ( OutputStream out : outs) try { + out.write(b); + } catch( Exception e) { + // ignore + } + } + + @Override + public void write(byte b[]) throws IOException + { + write(b, 0, b.length); + } + + @Override + public void write(byte b[], int off, int len) throws IOException + { + for ( OutputStream out : outs) try { + out.write(b, off, len); + } catch( Exception e) { + // ignore + } + } + + + @Override + public void flush() + { + for ( OutputStream out : outs) try { + out.flush(); + } catch( Exception e) { + // ignore + } + + } + +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java new file mode 100644 index 0000000000..17a95dfebc --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.systemio; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Logger; + +import org.apache.felix.service.systemio.SystemIO; +import org.osgi.annotation.bundle.Capability; +import org.osgi.namespace.service.ServiceNamespace; + +/** + * Implement the SystemIO API to allow the System streams to be overridden + */ +@Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, + attribute = "objectClass='org.apache.felix.service.systemio.SystemIO'") +public class SystemIOImpl extends InputStream implements SystemIO +{ + static private final Logger log = Logger.getLogger(SystemIOImpl.class.getName()); + + final List<InputStream> stdins = new CopyOnWriteArrayList<>(); + final List<OutputStream> stdouts = new CopyOnWriteArrayList<>(); + final List<OutputStream> stderrs = new CopyOnWriteArrayList<>(); + + final InputStream in = System.in; + final PrintStream out = System.out; + final PrintStream err = System.err; + + private PrintStream rout; + private PrintStream rerr; + + + public void start() + { + stdins.add(in); + stdouts.add(out); + stderrs.add(err); + rout = new PrintStream(new DelegateStream(stdouts), true); + rerr = new PrintStream(new DelegateStream(stderrs), true); + System.setOut(rout); + System.setErr(rerr); + System.setIn(this); + } + + public void stop() + { + if (System.in == this) + { + System.setIn(in); + } + else + { + log.warning("conflict: the dispatching input stream was replaced"); + } + if (System.out == rout) + { + System.setOut(out); + } + else + { + log.warning("conflict: the dispatching stdout stream was replaced"); + } + if (System.err == rerr) + { + System.setErr(err); + } + else + { + log.warning("conflict: the dispatching stderr stream was replaced"); + } + } + + @Override + public Closeable system(final InputStream stdin, final OutputStream stdout, final OutputStream stderr) + { + if (stdin != null && stdin != System.in && stdin != in) + { + stdins.add(0,stdin); + } + if (stdout != null) + stdouts.add(stdout); + if (stderr != null) + stderrs.add(stderr); + return new Closeable() + { + @Override + public void close() throws IOException + { + if (stdin != null && stdin != System.in && stdin != in) + { + int inInFront = stdins.indexOf(stdin); + assert inInFront >= 0; + stdins.remove(inInFront); + assert stdins.size() > 0; + } + if (stdout != null) + stdouts.remove(stdout); + if (stderr != null) + stderrs.remove(stderr); + } + }; + } + + @Override + public int read() throws IOException + { + assert stdins.size() > 0; + for ( InputStream in : stdins) { + int b = in.read(); + if ( b != SystemIO.NO_DATA) + return b; + } + return -1; // unreachable because stdin is at the end + } + +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java deleted file mode 100644 index 84dc7cef8e..0000000000 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.gogo.runtime.threadio; - -import java.io.InputStream; -import java.io.PrintStream; - -public class Marker -{ - final Marker previous; - InputStream in; - PrintStream out; - PrintStream err; - volatile boolean deactivated; - - public Marker(InputStream in, PrintStream out, PrintStream err, Marker previous) - { - this.previous = previous; - this.in = in; - this.out = out; - this.err = err; - } - - public InputStream getIn() - { - return deactivated ? previous.getIn() : in; - } - - public PrintStream getOut() - { - return deactivated ? previous.getOut() : out; - } - - public PrintStream getErr() - { - return deactivated ? previous.getErr() : err; - } - - void deactivate() - { - deactivated = true; - // Set to null for garbage collection - in = null; - out = null; - err = null; - } -} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java index 60d4ddfdba..b6e659486a 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java @@ -19,120 +19,149 @@ // DWB20: ThreadIO should check and reset IO if something (e.g. jetty) overrides package org.apache.felix.gogo.runtime.threadio; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; import java.util.logging.Logger; +import org.apache.felix.service.systemio.SystemIO; import org.apache.felix.service.threadio.ThreadIO; import org.osgi.annotation.bundle.Capability; import org.osgi.namespace.service.ServiceNamespace; -@Capability( - namespace = ServiceNamespace.SERVICE_NAMESPACE, - attribute = "objectClass='org.apache.felix.service.threadio.ThreadIO'" -) -public class ThreadIOImpl implements ThreadIO +@Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, + attribute = "objectClass='org.apache.felix.service.threadio.ThreadIO'") +public class ThreadIOImpl extends InputStream implements ThreadIO { - static private final Logger log = Logger.getLogger(ThreadIOImpl.class.getName()); - - final Marker defaultMarker = new Marker(System.in, System.out, System.err, null); - final ThreadPrintStream err = new ThreadPrintStream(this, System.err, true); - final ThreadPrintStream out = new ThreadPrintStream(this, System.out, false); - final ThreadInputStream in = new ThreadInputStream(this, System.in); - final ThreadLocal<Marker> current = new InheritableThreadLocal<Marker>() - { - @Override - protected Marker initialValue() - { - return defaultMarker; - } - }; - - public void start() - { - if (System.out instanceof ThreadPrintStream) - { - throw new IllegalStateException("Thread Print Stream already set"); - } - System.setOut(out); - System.setIn(in); - System.setErr(err); - } - - public void stop() - { - System.setErr(defaultMarker.err); - System.setOut(defaultMarker.out); - System.setIn(defaultMarker.in); - } - - private void checkIO() - { // derek - if (System.in != in) - { - log.fine("ThreadIO: eek! who's set System.in=" + System.in); - System.setIn(in); - } - - if (System.out != out) - { - log.fine("ThreadIO: eek! who's set System.out=" + System.out); - System.setOut(out); - } - - if (System.err != err) - { - log.fine("ThreadIO: eek! who's set System.err=" + System.err); - System.setErr(err); - } - } - - Marker current() - { - Marker m = current.get(); - if (m.deactivated) - { - while (m.deactivated) + static final Logger log = Logger.getLogger(ThreadIOImpl.class.getName()); + final SystemIO systemio; + final ThreadLocal<Streams> threadLocal = new ThreadLocal<>(); + Closeable system; + + class Streams + { + final PrintStream out; + final PrintStream err; + final InputStream in; + Streams prev; + + Streams(InputStream in, PrintStream out, PrintStream err) + { + this.in = in; + this.out = out; + this.err = err; + } + + } + + abstract class ThreadOutStream extends OutputStream + { + @Override + public void write(int b) throws IOException + { + Streams streams = threadLocal.get(); + if (streams == null) + return; + + get(streams).write(b); + } + + @Override + public void write(byte[] b) throws IOException + { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + Streams streams = threadLocal.get(); + if (streams == null) + return; + + get(streams).write(b, off, len); + } + + abstract OutputStream get(Streams s); + } + + + public ThreadIOImpl(SystemIO systemio) + { + this.systemio = systemio; + } + + public void start() + {} + + public void stop() + { + if (system != null) + try + { + system.close(); + } + catch (IOException e) + { + // ignore + } + } + + public void close() + { + Streams streams = threadLocal.get(); + if (streams != null) + threadLocal.set(streams.prev); + } + + public void setStreams(InputStream in, PrintStream out, PrintStream err) + { + init(); + Streams s = new Streams(in, out, err); + s.prev = threadLocal.get(); + threadLocal.set(s); + } + + private synchronized void init() + { + if (system == null) + { + system = systemio.system(this, new ThreadOutStream() + { + + @Override + OutputStream get(Streams s) { - m = m.previous; + return s.out; } - current.set(m); - } - return m; - } - - public void close() - { - checkIO(); // derek - Marker top = this.current.get(); - if (top == null) - { - throw new IllegalStateException("No thread io active"); - } - if (top != defaultMarker) - { - top.deactivate(); - this.current.set(top.previous); - } - } - - public void setStreams(InputStream in, PrintStream out, PrintStream err) - { - assert in != null; - assert out != null; - assert err != null; - checkIO(); // derek - Marker prev = current(); - if (in == this.in) { - in = prev.getIn(); - } - if (out == this.out) { - out = prev.getOut(); - } - if (err == this.err) { - err = prev.getErr(); - } - Marker marker = new Marker(in, out, err, prev); - this.current.set(marker); - } + + }, new ThreadOutStream() + { + + @Override + OutputStream get(Streams s) + { + return s.err; + } + }); + } + } + + @Override + public int read() throws IOException + { + Streams s = threadLocal.get(); + while (s != null) + { + if (s.in == null) + s = s.prev; + else + { + return s.in.read(); + } + } + return SystemIO.NO_DATA; + } } diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java deleted file mode 100644 index af86a316f0..0000000000 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.gogo.runtime.threadio; - -import java.io.IOException; -import java.io.InputStream; - -public class ThreadInputStream extends InputStream -{ - final InputStream dflt; - final ThreadIOImpl io; - - public ThreadInputStream(ThreadIOImpl threadIO, InputStream in) - { - io = threadIO; - dflt = in; - } - - private InputStream getCurrent() - { - Marker marker = io.current(); - return marker.getIn(); - } - - /** - * Access to the root stream through reflection - * @return InputStream - */ - public InputStream getRoot() - { - return dflt; - } - - // - // Delegate methods - // - - public int read() throws IOException - { - return getCurrent().read(); - } - - public int read(byte[] b) throws IOException - { - return getCurrent().read(b); - } - - public int read(byte[] b, int off, int len) throws IOException - { - return getCurrent().read(b, off, len); - } - - public long skip(long n) throws IOException - { - return getCurrent().skip(n); - } - - public int available() throws IOException - { - return getCurrent().available(); - } - - public void close() throws IOException - { - getCurrent().close(); - } - - public void mark(int readlimit) - { - getCurrent().mark(readlimit); - } - - public void reset() throws IOException - { - getCurrent().reset(); - } - - public boolean markSupported() - { - return getCurrent().markSupported(); - } -} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java deleted file mode 100644 index decca062fe..0000000000 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.gogo.runtime.threadio; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Locale; - -public class ThreadPrintStream extends PrintStream -{ - final PrintStream dflt; - final ThreadIOImpl io; - final boolean errorStream; - - public ThreadPrintStream(ThreadIOImpl threadIO, PrintStream out, boolean error) - { - super(out); - dflt = out; - io = threadIO; - errorStream = error; - } - - public PrintStream getCurrent() - { - Marker marker = io.current(); - return errorStream ? marker.getErr() : marker.getOut(); - } - - /** - * Access to the root stream through reflection - */ - public PrintStream getRoot() - { - return dflt; - } - - // - // Delegate methods - // - - public void flush() - { - getCurrent().flush(); - } - - public void close() - { - getCurrent().close(); - } - - public boolean checkError() - { - return getCurrent().checkError(); - } - - public void setError() - { - // getCurrent().setError(); - } - - public void clearError() - { - // getCurrent().clearError(); - } - - public void write(int b) - { - getCurrent().write(b); - } - - public void write(byte[] buf, int off, int len) - { - getCurrent().write(buf, off, len); - } - - public void print(boolean b) - { - getCurrent().print(b); - } - - public void print(char c) - { - getCurrent().print(c); - } - - public void print(int i) - { - getCurrent().print(i); - } - - public void print(long l) - { - getCurrent().print(l); - } - - public void print(float f) - { - getCurrent().print(f); - } - - public void print(double d) - { - getCurrent().print(d); - } - - public void print(char[] s) - { - getCurrent().print(s); - } - - public void print(String s) - { - getCurrent().print(s); - } - - public void print(Object obj) - { - getCurrent().print(obj); - } - - public void println() - { - getCurrent().println(); - } - - public void println(boolean x) - { - getCurrent().println(x); - } - - public void println(char x) - { - getCurrent().println(x); - } - - public void println(int x) - { - getCurrent().println(x); - } - - public void println(long x) - { - getCurrent().println(x); - } - - public void println(float x) - { - getCurrent().println(x); - } - - public void println(double x) - { - getCurrent().println(x); - } - - public void println(char[] x) - { - getCurrent().println(x); - } - - public void println(String x) - { - getCurrent().println(x); - } - - public void println(Object x) - { - getCurrent().println(x); - } - - public PrintStream printf(String format, Object... args) - { - return getCurrent().printf(format, args); - } - - public PrintStream printf(Locale l, String format, Object... args) - { - return getCurrent().printf(l, format, args); - } - - public PrintStream format(String format, Object... args) - { - return getCurrent().format(format, args); - } - - public PrintStream format(Locale l, String format, Object... args) - { - return getCurrent().format(l, format, args); - } - - public PrintStream append(CharSequence csq) - { - return getCurrent().append(csq); - } - - public PrintStream append(CharSequence csq, int start, int end) - { - return getCurrent().append(csq, start, end); - } - - public PrintStream append(char c) - { - return getCurrent().append(c); - } - - public void write(byte[] b) throws IOException - { - getCurrent().write(b); - } -} diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java index e4e6df1800..7082d2e352 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java +++ b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java @@ -111,6 +111,12 @@ public interface CommandSession extends AutoCloseable * @return Object */ Object convert(Class<?> type, Object instance); + + /** + * When this session is stopped, execute the runnable. + * @param runnable the runnable to run + */ + void onClose( Runnable runnable); // // Job support diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java b/gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java new file mode 100644 index 0000000000..c02ab2d1a1 --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.service.systemio; + +import java.io.Closeable; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A simple service to listen to the system streams. The System.out and System.err writes will be dispatched to all + * registered listeners as well as the original streams. If a read is done on System.in, the last registered will handle + * the read. If no one is registered, the original System.in is used. + * <p> + * The purpose of this service is to share the System.in, System.out, and System.err singletons. + * <p> + * Implementations must warn if someone else overrides the System.xxx streams and not reset them if this happens. + * + */ +public interface SystemIO +{ + /** + * A framework property signalling that Gogo should use an external SystemIO service instead of its build in one. The + * property value must be the number of milliseconds to wait for this external service. Any value <=0 or not numeric + * will result in using the internal implementation. + */ + + String TIMEOUT = "org.apache.felix.gogo.systemio.timeout"; + + /** + * An input stream can return this from {@link InputStream#read()} when it has no data. This should force the + * implementation to look for another input stream. + */ + int NO_DATA = -42; + + /** + * Register overrides for the System streams. If a stream is null, it will not be registered. The stdin InputStream + * can return {@link #NO_DATA} if it does not want to be used as the last input stream. This can be used to filter + * for example by the current thread. + * + * @param stdin the System.in handler or null. + * @param stdout the System.out listener + * @param stderr the System.err listener + * @return a closeable that when closed will unregister the streams + */ + Closeable system(InputStream stdin, OutputStream stdout, OutputStream stderr); +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java b/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java index 08b04f53aa..b408433b91 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java +++ b/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.io.PrintStream; +import org.apache.felix.service.systemio.SystemIO; + /** * Enable multiplexing of the standard IO streams for input, output, and error. * <p> diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java index 323842588b..eb88e11b33 100644 --- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.io.PrintStream; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl; import org.junit.After; import org.junit.Before; @@ -33,19 +34,23 @@ public abstract class AbstractParserTest { private InputStream sin; private PrintStream sout; private PrintStream serr; + private SystemIOImpl systemIO; @Before public void setUp() { sin = new NoCloseInputStream(System.in); sout = new NoClosePrintStream(System.out); serr = new NoClosePrintStream(System.err); - threadIO = new ThreadIOImpl(); + systemIO = new SystemIOImpl(); + systemIO.start(); + threadIO = new ThreadIOImpl(systemIO); threadIO.start(); } @After public void tearDown() { threadIO.stop(); + systemIO.stop(); } public class Context extends org.apache.felix.gogo.runtime.Context { diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java new file mode 100644 index 0000000000..4cc5a08545 --- /dev/null +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + + +public class CommandSessionTest +{ + + @Test + public void onCloseTest() + { + CommandProcessorImpl processor = new CommandProcessorImpl(null); + ByteArrayInputStream bais = new ByteArrayInputStream("".getBytes()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final AtomicInteger b = new AtomicInteger(0); + try (CommandSessionImpl session = processor.createSession(bais, baos, baos)) + { + + + Runnable runnable = new Runnable() + { + + @Override + public void run() + { + b.incrementAndGet(); + } + }; + session.onClose(runnable); + session.onClose(runnable); + + assertEquals(0,b.get()); + System.gc(); + } + + assertEquals(2,b.get()); + } + + @Test + public void onCloseTestGc() + { + CommandProcessorImpl processor = new CommandProcessorImpl(null); + ByteArrayInputStream bais = new ByteArrayInputStream("".getBytes()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final AtomicInteger b = new AtomicInteger(0); + try (CommandSessionImpl session = processor.createSession(bais, baos, baos)) + { + + + session.onClose(new Runnable() + { + + @Override + public void run() + { + b.incrementAndGet(); + } + }); + session.onClose(new Runnable() + { + + @Override + public void run() + { + b.incrementAndGet(); + } + }); + + assertEquals(0,b.get()); + } + + assertEquals(2,b.get()); + } + +} diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java index d41d2617b7..44d50768ca 100644 --- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl; import org.junit.Before; import org.junit.Test; @@ -501,7 +502,9 @@ public void close() { public void close() { } }; - ThreadIOImpl tio = new ThreadIOImpl(); + SystemIOImpl sio = new SystemIOImpl(); + sio.start(); + ThreadIOImpl tio = new ThreadIOImpl(sio); tio.start(); try @@ -520,6 +523,7 @@ public void close() { finally { tio.stop(); + sio.stop(); } } diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java new file mode 100644 index 0000000000..59a4c55c85 --- /dev/null +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.threadio; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.felix.gogo.runtime.activator.Activator; +import org.apache.felix.gogo.runtime.activator.ServiceFacade; +import org.apache.felix.service.systemio.SystemIO; +import org.apache.felix.service.threadio.ThreadIO; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleException; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceException; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.launch.Framework; + +public class TestIOWithFramework +{ + private Framework framework; + private File tmp; + + @Before + public void setup() throws Exception + { + tmp = Files.createTempDirectory("TestIOWithFramework").toFile(); + } + + private void fw(Map<String, String> configuration) throws BundleException + { + if (configuration == null) + { + configuration = new HashMap<>(); + } + + configuration.put(Constants.FRAMEWORK_STORAGE, tmp.getAbsolutePath()); + configuration.put(Constants.FRAMEWORK_STORAGE_CLEAN, Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); + framework = ServiceLoader.load(org.osgi.framework.launch.FrameworkFactory.class).iterator().next() + .newFramework(configuration); + framework.init(); + framework.start(); + } + + @After + public void after() throws Exception + { + framework.stop(); + framework.waitForStop(100000); + delete(tmp); + } + + void delete(File f) + { + if (f.isFile()) + { + f.delete(); + } + else + { + for (File sub : f.listFiles()) + { + delete(sub); + } + } + } + + @Test + public void testWithFrameworkAndDefault() throws Exception + { + fw(null); + BundleContext context = framework.getBundleContext(); + Activator a = new Activator(); + a.start(context); + assertNotNull(context.getServiceReference(SystemIO.class)); + assertNotNull(context.getServiceReference(ThreadIO.class)); + a.stop(context); + } + + @Test + public void testWithExternalSystemIO() throws Exception + { + Map<String, String> configuration = new HashMap<>(); + configuration.put("org.apache.felix.gogo.systemio.timeout", "5000"); + fw(configuration); + BundleContext context = framework.getBundleContext(); + + Closeable c = mock(Closeable.class); + SystemIO sio = mock(SystemIO.class); + when(sio.system(Mockito.any(InputStream.class), Mockito.any(OutputStream.class), Mockito.any(OutputStream.class))) + .thenReturn(c); + context.registerService(SystemIO.class, sio, null); + + Activator a = new Activator(); + a.start(context); + + ServiceReference<ThreadIO> ref = context.getServiceReference(ThreadIO.class); + assertNotNull(ref); + ThreadIO tio = context.getService(ref); + assertNotNull(tio); + tio.setStreams(null, null, null); + a.stop(context); + + Mockito.verify(c).close(); + } + + public interface Foo + { + void bar(); + } + + @Test(expected = ServiceException.class) + public void testFacadeWithoutService() throws BundleException, IOException + { + fw(null); + try (ServiceFacade<Foo> sf = new ServiceFacade<>(Foo.class, framework.getBundleContext(), 500)) + { + Foo foo = sf.get(); + + foo.bar(); + } + } + + @Test + public void testFacadeWithService() throws BundleException, IOException + { + fw(null); + final AtomicInteger ai = new AtomicInteger(0); + framework.getBundleContext().registerService(Foo.class, new Foo() { + + @Override + public void bar() + { + ai.incrementAndGet(); + } + + }, null); + try (ServiceFacade<Foo> sf = new ServiceFacade<>(Foo.class, framework.getBundleContext(), 5000)) + { + Foo foo = sf.get(); + + long time = System.currentTimeMillis(); + foo.bar(); + if ( System.currentTimeMillis() > time + 4000) + fail("Took too much time"); + + assertEquals(1, ai.get()); + } + } +} diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java index 5ffe41737e..0b3b39e437 100644 --- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java @@ -18,74 +18,154 @@ */ package org.apache.felix.gogo.runtime.threadio; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.junit.Assert; import org.junit.Test; -public class TestThreadIO { +public class TestThreadIO +{ - /** - * Test if the threadio works in a nested fashion. We first push - * ten markers on the stack and print a message for each, capturing - * the output in a ByteArrayOutputStream. Then we pop them, also printing - * a message identifying the level. Then we verify the output for each level. - */ - @Test - public void testNested() - { - ThreadIOImpl tio = new ThreadIOImpl(); - tio.start(); - List<ByteArrayOutputStream> list = new ArrayList<>(); - for (int i = 0; i < 10; i++) - { + /** + * Test if the threadio works in a nested fashion. We first push ten markers on the stack and print a message for + * each, capturing the output in a ByteArrayOutputStream. Then we pop them, also printing a message identifying the + * level. Then we verify the output for each level. + */ + @Test + public void testNested() + { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + try + { + tio.start(); + List<ByteArrayOutputStream> list = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { ByteArrayOutputStream out = new ByteArrayOutputStream(); list.add(out); tio.setStreams(System.in, new PrintStream(out), System.err); System.out.print("b" + i); - } - for (int i = 9; i >= 0; i--) - { + } + for (int i = 9; i >= 0; i--) + { System.out.println("e" + i); tio.close(); - } - tio.stop(); - for (int i = 0; i < 10; i++) - { + } + for (int i = 0; i < 10; i++) + { String message = list.get(i).toString().trim(); Assert.assertEquals("b" + i + "e" + i, message); - } - } + } + } + finally + { + tio.stop(); + systemio.stop(); + } + } - /** - * Simple test too see if the basics work. - */ - @Test - public void testSimple() - { - ThreadIOImpl tio = new ThreadIOImpl(); - tio.start(); - System.out.println("Hello World"); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - tio.setStreams(System.in, new PrintStream(out), new PrintStream(err)); - try - { - System.out.println("Simple Normal Message"); - System.err.println("Simple Error Message"); - } - finally - { - tio.close(); - } - tio.stop(); - String normal = out.toString().trim(); - //String error = err.toString().trim(); - Assert.assertEquals("Simple Normal Message", normal); - //assertEquals("Simple Error Message", error ); - System.out.println("Goodbye World"); - } + /** + * Simple test too see if the basics work. + * @throws IOException + */ + @SuppressWarnings("resource") + @Test + public void testSimple() throws IOException + { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + tio.start(); + try + { + System.out.println("Hello World"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + tio.setStreams(System.in, new PrintStream(out), new PrintStream(err)); + + System.out.println("Simple Normal Message"); + System.err.println("Simple Error Message"); + tio.stop(); + String normal = out.toString().trim(); + // String error = err.toString().trim(); + Assert.assertEquals("Simple Normal Message", normal); + // assertEquals("Simple Error Message", error ); + System.out.println("Goodbye World"); + } + finally + { + systemio.close(); + tio.close(); + } + } + + @Test + @SuppressWarnings("resource") + public void testNullInputStream() throws IOException { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + tio.start(); + try + { + byte[] test = "abc".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream bin = new ByteArrayInputStream( test); + tio.setStreams(bin, null, null); + tio.setStreams(null, null, null); + byte data[] = new byte[3]; + System.in.read(data); + assertTrue(Arrays.equals(test, data)); + + tio.close(); + } + finally + { + tio.stop(); + systemio.close(); + } + } + + @Test + @SuppressWarnings("resource") + public void testNoInputStreamSetInThreadIO() throws IOException { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + byte[] test = "abc".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream bin = new ByteArrayInputStream( test); + Closeable system = systemio.system(bin, null,null); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + tio.start(); + try + { + byte data[] = new byte[3]; + System.in.read(data); + assertTrue(Arrays.equals(test, data)); + tio.close(); + } + finally + { + system.close(); + systemio.close(); + tio.stop(); + } + } + + @Test + public void testWithFrameworkService() { + + } }