Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rebind #1194

Merged
merged 11 commits into from
Feb 10, 2014
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ prodDb.*
*.log
brooklyn*.log.*

*brooklyn-persisted-state/

ignored
10 changes: 10 additions & 0 deletions api/src/main/java/brooklyn/mementos/Memento.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ public interface Memento extends Serializable {
public Map<String, ? extends Object> getCustomFields();

public String toVerboseString();

public void injectTypeClass(Class<?> clazz);

/**
* Returns the injected type class, or null if not injected.
* <p>
* This is useful for ensuring the correct classloader is used (e.g. for {@link EntityMemento}
* previously calling {@code EntityTypes.getDefinedSensors(getType())}.
*/
public Class<?> getTypeClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,25 @@ private void persistNow() {
} else {
PersisterDeltaImpl persisterDelta = new PersisterDeltaImpl();
for (Location location : prevDeltaCollector.locations) {
persisterDelta.locations.add(((LocationInternal)location).getRebindSupport().getMemento());
try {
persisterDelta.locations.add(((LocationInternal)location).getRebindSupport().getMemento());
} catch (Exception e) {
handleGenerateMementoException(e, "location "+location.getClass().getSimpleName()+"("+location.getId()+")");
}
}
for (Entity entity : prevDeltaCollector.entities) {
persisterDelta.entities.add(((EntityInternal)entity).getRebindSupport().getMemento());
try {
persisterDelta.entities.add(((EntityInternal)entity).getRebindSupport().getMemento());
} catch (Exception e) {
handleGenerateMementoException(e, "entity "+entity.getEntityType().getSimpleName()+"("+entity.getId()+")");
}
}
for (Policy policy : prevDeltaCollector.policies) {
persisterDelta.policies.add(policy.getRebindSupport().getMemento());
try {
persisterDelta.policies.add(policy.getRebindSupport().getMemento());
} catch (Exception e) {
handleGenerateMementoException(e, "location "+policy.getClass().getSimpleName()+"("+policy.getId()+")");
}
}
persisterDelta.removedLocationIds = prevDeltaCollector.removedLocationIds;
persisterDelta.removedEntityIds = prevDeltaCollector.removedEntityIds;
Expand Down Expand Up @@ -211,22 +223,34 @@ private void persistNow() {
}
}

protected void handleGenerateMementoException(Exception e, String context) {
Exceptions.propagateIfFatal(e);
if (isActive()) {
LOG.warn("Problem generating memento for "+context, e);
} else {
LOG.debug("Problem generating memento for "+context+", but no longer active (ignoring)", e);
}
}

@Override
public synchronized void onManaged(Entity entity) {
if (LOG.isTraceEnabled()) LOG.trace("onManaged: {}", entity);
if (!isStopped()) {
onChanged(entity);
}
}

@Override
public synchronized void onManaged(Location location) {
if (LOG.isTraceEnabled()) LOG.trace("onManaged: {}", location);
if (!isStopped()) {
onChanged(location);
}
}

@Override
public synchronized void onChanged(Entity entity) {
if (LOG.isTraceEnabled()) LOG.trace("onChanged: {}", entity);
if (!isStopped()) {
deltaCollector.entities.add(entity);

Expand All @@ -247,6 +271,7 @@ public synchronized void onChanged(Entity entity) {

@Override
public synchronized void onUnmanaged(Entity entity) {
if (LOG.isTraceEnabled()) LOG.trace("onUnmanaged: {}", entity);
if (!isStopped()) {
deltaCollector.removedEntityIds.add(entity.getId());
deltaCollector.entities.remove(entity);
Expand All @@ -255,6 +280,7 @@ public synchronized void onUnmanaged(Entity entity) {

@Override
public synchronized void onUnmanaged(Location location) {
if (LOG.isTraceEnabled()) LOG.trace("onUnmanaged: {}", location);
if (!isStopped()) {
deltaCollector.removedLocationIds.add(location.getId());
deltaCollector.locations.remove(location);
Expand All @@ -263,13 +289,15 @@ public synchronized void onUnmanaged(Location location) {

@Override
public synchronized void onChanged(Location location) {
if (LOG.isTraceEnabled()) LOG.trace("onChanged: {}", location);
if (!isStopped()) {
deltaCollector.locations.add(location);
}
}

@Override
public synchronized void onChanged(Policy policy) {
if (LOG.isTraceEnabled()) LOG.trace("onChanged: {}", policy);
if (!isStopped()) {
deltaCollector.policies.add(policy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ public List<Application> rebind(final ClassLoader classLoader) throws IOExceptio
if (LOG.isDebugEnabled()) LOG.debug("RebindManager instantiating entity {}", entityMemento);

Entity entity = newEntity(entityMemento, reflections);
managementContext.prePreManage(entity);
entities.put(entityMemento.getId(), entity);
rebindContext.registerEntity(entityMemento.getId(), entity);
}
Expand Down Expand Up @@ -259,6 +258,7 @@ public List<Application> rebind(final ClassLoader classLoader) throws IOExceptio
Entity entity = rebindContext.getEntity(entityMemento.getId());
if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing entity {}", entityMemento);

entityMemento.injectTypeClass(entity.getClass());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice solution

((EntityInternal)entity).getRebindSupport().reconstruct(rebindContext, entityMemento);
}

Expand Down Expand Up @@ -286,7 +286,7 @@ public List<Application> rebind(final ClassLoader classLoader) throws IOExceptio
LOG.info("RebindManager complete; return apps: {}", memento.getApplicationIds());
return apps;
} catch (Exception e) {
LOG.warn("Problem during rebinid (rethrowing)", e);
LOG.warn("Problem during rebind (rethrowing)", e);
throw Exceptions.propagate(e);
}
}
Expand All @@ -295,6 +295,7 @@ private Entity newEntity(EntityMemento memento, Reflections reflections) {
String entityId = memento.getId();
String entityType = checkNotNull(memento.getType(), "entityType of "+entityId);
Class<? extends Entity> entityClazz = (Class<? extends Entity>) reflections.loadClass(entityType);
memento.injectTypeClass(entityClazz);

if (InternalEntityFactory.isNewStyleEntity(managementContext, entityClazz)) {
// Not using entityManager.createEntity(EntitySpec) because don't want init() to be called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ protected static abstract class Builder<B extends Builder<?>> {
protected String brooklynVersion = BrooklynVersion.get();
protected String id;
protected String type;
protected Class<?> typeClass;
protected String displayName;
protected Map<String, Object> fields = Maps.newLinkedHashMap();

Expand All @@ -33,6 +34,7 @@ public B from(Memento other) {
brooklynVersion = other.getBrooklynVersion();
id = other.getId();
type = other.getType();
typeClass = other.getTypeClass();
displayName = other.getDisplayName();
fields.putAll(other.getCustomFields());
return self();
Expand All @@ -46,6 +48,9 @@ public B id(String val) {
public B type(String val) {
type = val; return self();
}
public B typeClass(Class<?> val) {
typeClass = val; return self();
}
public B displayName(String val) {
displayName = val; return self();
}
Expand All @@ -58,7 +63,9 @@ public B customFields(Map<String,?> vals) {
private String type;
private String id;
private String displayName;


private transient Class<?> typeClass;

// for de-serialization
protected AbstractMemento() {
}
Expand All @@ -68,6 +75,7 @@ protected AbstractMemento(Builder<?> builder) {
brooklynVersion = builder.brooklynVersion;
id = builder.id;
type = builder.type;
typeClass = builder.typeClass;
displayName = builder.displayName;
setCustomFields(builder.fields);
}
Expand All @@ -76,6 +84,16 @@ protected AbstractMemento(Builder<?> builder) {
// but the method declared here simplifies how it is connected in via builder etc
protected abstract void setCustomFields(Map<String, Object> fields);

@Override
public void injectTypeClass(Class<?> clazz) {
this.typeClass = clazz;
}

@Override
public Class<?> getTypeClass() {
return typeClass;
}

@Override
public String getBrooklynVersion() {
return brooklynVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;

import brooklyn.config.ConfigKey;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityTypes;
Expand Down Expand Up @@ -141,8 +142,10 @@ protected BasicEntityMemento(Builder builder) {
}

protected synchronized Map<String, ConfigKey<?>> getStaticConfigKeys() {
if (staticConfigKeys==null)
staticConfigKeys = EntityTypes.getDefinedConfigKeys(getType());
if (staticConfigKeys==null) {
Class<? extends Entity> clazz = (Class<? extends Entity>) getTypeClass();
staticConfigKeys = (clazz == null) ? EntityTypes.getDefinedConfigKeys(getType()) : EntityTypes.getDefinedConfigKeys(clazz);
}
return staticConfigKeys;
}

Expand All @@ -155,8 +158,10 @@ protected ConfigKey<?> getConfigKey(String key) {
}

protected synchronized Map<String, Sensor<?>> getStaticSensorKeys() {
if (staticSensorKeys==null)
staticSensorKeys = EntityTypes.getDefinedSensors(getType());
if (staticSensorKeys==null) {
Class<? extends Entity> clazz = (Class<? extends Entity>) getTypeClass();
staticSensorKeys = (clazz == null) ? EntityTypes.getDefinedSensors(getType()) : EntityTypes.getDefinedSensors(clazz);
}
return staticSensorKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static BasicEntityMemento.Builder newEntityMementoBuilder(Entity entity)
builder.id = entity.getId();
builder.displayName = entity.getDisplayName();
builder.type = entity.getClass().getName();
builder.typeClass = entity.getClass();
builder.isTopLevelApp = (entity instanceof Application && entity.getParent() == null);

Map<ConfigKey<?>, Object> localConfig = ((EntityInternal)entity).getConfigMap().getLocalConfig();
Expand Down Expand Up @@ -169,6 +170,7 @@ public static BasicLocationMemento.Builder newLocationMementoBuilder(Location lo
ConfigBag persistableConfig = new ConfigBag().copy( ((AbstractLocation)location).getLocalConfigBag() ).removeAll(nonPersistableFlagNames);

builder.type = location.getClass().getName();
builder.typeClass = location.getClass();
builder.id = location.getId();
builder.displayName = location.getDisplayName();
builder.copyConfig(persistableConfig);
Expand All @@ -195,6 +197,7 @@ public static BasicPolicyMemento.Builder newPolicyMementoBuilder(Policy policy)
BasicPolicyMemento.Builder builder = BasicPolicyMemento.builder();

builder.type = policy.getClass().getName();
builder.typeClass = policy.getClass();
builder.id = policy.getId();
builder.displayName = policy.getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.time.Time;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -75,6 +75,7 @@ public void delete() {
return;
}

requireWrite.set(null);
requireDelete.set(true);
if (executing.compareAndSet(false, true)) {
if (LOG.isTraceEnabled()) LOG.trace("Submitting delete task for {}", file);
Expand Down Expand Up @@ -124,10 +125,11 @@ private void deleteAsync() {
} catch (Throwable t) {
if (executor.isShutdown()) {
LOG.debug("Error deleting "+file+" (but executor shutdown)", t);
return null; // just return without throwing; no more work to do
} else {
LOG.error("Error deleting "+file, t);
throw Exceptions.propagate(t);
}
throw Throwables.propagate(t);
}
}});
addPostExecListener(future);
Expand All @@ -142,10 +144,11 @@ private void writeAsync() {
} catch (Throwable t) {
if (executor.isShutdown()) {
LOG.debug("Error writing to "+file+" (but executor shutdown)", t);
return null; // just return without throwing; no more work to do
} else {
LOG.error("Error writing to "+file, t);
throw Exceptions.propagate(t);
}
throw Throwables.propagate(t);
}
}});
addPostExecListener(future);
Expand Down Expand Up @@ -177,8 +180,13 @@ private void addPostExecListener(ListenableFuture<?> future) {
if (LOG.isTraceEnabled()) LOG.trace("No pending exec-requirements for {}", file);
}
} catch (Throwable t) {
LOG.error("Error in post-exec for "+file, t);
throw Throwables.propagate(t);
if (executor.isShutdown()) {
LOG.debug("Error in post-exec for "+file+" (but executor shutdown)", t);
return; // just return without throwing; no more work to do
} else {
LOG.error("Error in post-exec for "+file, t);
throw Exceptions.propagate(t);
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ synchronized boolean isPreRegistered(Entity e) {

synchronized void prePreManage(Entity entity) {
if (isPreRegistered(entity)) {
log.warn(""+this+" redundant call to pre-pre-manage entity"+entity+"; skipping",
log.warn(""+this+" redundant call to pre-pre-manage entity "+entity+"; skipping",
new Exception("source of duplicate pre-pre-manage of "+entity));
return;
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/brooklyn/util/ResourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,11 @@ public InputStream getResourceFromUrl(String url) {
}
throw new IOException("'"+orig+"' not found on classpath or filesystem");
} catch (Exception e) {
if (context!=null)
if (context!=null) {
throw new RuntimeException("Error getting resource '"+url+"' for "+context+": "+e, e);
else throw new RuntimeException(e);
} else {
throw Exceptions.propagate(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void uncaughtException(Thread t, Throwable e) {

public void shutdownNow() {
runner.shutdownNow();
delayedRunner.shutdownNow();
}

public void addListener(ExecutionListener listener) {
Expand Down
Loading