Skip to content

Commit

Permalink
[H2] Compute separate memory estimate for each obs type so DB write
Browse files Browse the repository at this point in the history
buffers and cache sizes are computed correctly
  • Loading branch information
alexrobin committed May 9, 2023
1 parent 2d58146 commit dbf3931
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.h2.mvstore.MVMap;
import org.sensorhub.api.common.BigId;
import org.sensorhub.api.data.IObsData;
import org.sensorhub.api.data.ObsData;
import org.sensorhub.impl.datastore.h2.kryo.KryoDataType;
import org.sensorhub.impl.datastore.h2.kryo.PersistentClassResolver;
Expand Down Expand Up @@ -48,4 +49,10 @@ class ObsDataType extends KryoDataType
kryo.addDefaultSerializer(BigId.class, BigIdSerializers.factory(idScope));
};
}


protected long getRecordTypeKey(Object obj)
{
return ((IObsData)obj).getDataStreamID().getIdAsLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.sensorhub.impl.datastore.h2.kryo;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.h2.mvstore.WriteBuffer;
Expand All @@ -38,21 +40,22 @@ public class KryoDataType implements DataType
{
static Logger log = LoggerFactory.getLogger(KryoDataType.class);

final Pool<KryoInstance> kryoPool;
//final ThreadLocal<KryoInstance> kryoLocal;
final Pool<KryoInstance> kryoReadPool;
final Map<Long, KryoInstance> kryoWritePool;
protected Supplier<ClassResolver> classResolver;
protected Consumer<Kryo> configurator;
protected SerializerFactory<?> defaultObjectSerializer;
protected int averageRecordSize, maxRecordSize;
protected int maxWriteBufferSize;


static class KryoInstance
{
Kryo kryo;
Output output;
Input input;
int avgRecordSize;

KryoInstance(SerializerFactory<?> defaultObjectSerializer, ClassResolver classResolver, Consumer<Kryo> configurator, int bufferSize, int maxBufferSize)
KryoInstance(SerializerFactory<?> defaultObjectSerializer, ClassResolver classResolver, Consumer<Kryo> configurator)
{
kryo = classResolver != null ? new Kryo(classResolver, null) : new Kryo();
kryo.setRegistrationRequired(false);
Expand All @@ -74,7 +77,6 @@ static class KryoInstance
((PersistentClassResolver) classResolver).loadMappings();

input = new Input();
output = new Output(bufferSize, maxBufferSize);

log.debug("{}: kryo={}, classResolver={}",
Thread.currentThread(),
Expand All @@ -90,112 +92,132 @@ public KryoDataType()
}


public KryoDataType(final int maxRecordSize)
public KryoDataType(final int maxWriteBufferSize)
{
//Log.set(Log.LEVEL_TRACE);
this.maxRecordSize = maxRecordSize;
this.maxWriteBufferSize = maxWriteBufferSize;

// set default serializer to our versioned serializer
this.defaultObjectSerializer = VersionedSerializer.<Object>factory2(ImmutableMap.of(
H2Utils.CURRENT_VERSION, new SerializerFactory.FieldSerializerFactory()),
H2Utils.CURRENT_VERSION);

// use a pool of kryo objects
this.kryoPool = new Pool<KryoInstance>(true, false, 2*Runtime.getRuntime().availableProcessors()) {
// use a pool of kryo objects for reading
this.kryoReadPool = new Pool<KryoInstance>(true, false, 2*Runtime.getRuntime().availableProcessors()) {
@Override
protected KryoInstance create()
{
return new KryoInstance(
defaultObjectSerializer,
classResolver != null ? classResolver.get() : null,
configurator,
2*averageRecordSize,
maxRecordSize);
configurator);
}
};

/*// we get the object from thread local everytime
this.kryoLocal = new ThreadLocal<KryoInstance>()
{
public KryoInstance initialValue()
{
//log.debug("Loading Kryo instance for " + KryoDataType.this.getClass().getSimpleName());
return new KryoInstance(
defaultObjectSerializer,
classResolver != null ? classResolver.get() : null,
configurator,
2*averageRecordSize,
maxRecordSize);
}
};*/
// use a map of kryo objects for writing
// the map provides a separate kryo object for each record type
// so we can keep track of average size for each type of record separately
this.kryoWritePool = new HashMap<>();
}


protected KryoInstance getKryo()
protected KryoInstance getReadKryo()
{
//return kryoLocal.get();
return kryoPool.obtain();
return kryoReadPool.obtain();
}


protected void releaseKryo(KryoInstance kryoI)
protected KryoInstance getWriteKryo(Object obj)
{
kryoPool.free(kryoI);
var key = getRecordTypeKey(obj);
var kryoI = kryoWritePool.computeIfAbsent(key, k -> {
var kryo = new KryoInstance(
defaultObjectSerializer,
classResolver != null ? classResolver.get() : null,
configurator);

// compute initial record size and buffer size
kryo.output = new Output(8*1024, maxWriteBufferSize);
initRecordSize(kryo, obj);

return kryo;
});

/*System.err.println(obj.getClass().getCanonicalName() + "(" + key + "): " +
"avgRecordSize=" + kryoI.avgRecordSize + ", " +
"writeBufferSize=" + kryoI.output.getBuffer().length);*/
return kryoI;
}


protected void releaseReadKryo(KryoInstance kryoI)
{
kryoReadPool.free(kryoI);
}


@Override
public int getMemory(Object obj)
{
initRecordSize(obj);
return averageRecordSize;
return computeRecordSize(obj);
}


@Override
public void write(WriteBuffer buff, Object obj)
public Object read(ByteBuffer buff)
{
KryoInstance kryoI = getKryo();
write(buff, obj, kryoI);
releaseKryo(kryoI);
KryoInstance kryoI = getReadKryo();
var obj = read(buff, kryoI);
releaseReadKryo(kryoI);
return obj;
}


@Override
public void write(WriteBuffer buff, Object[] obj, int len, boolean key)
public void read(ByteBuffer buff, Object[] obj, int len, boolean key)
{
KryoInstance kryoI = getKryo();
KryoInstance kryoI = getReadKryo();
for (int i = 0; i < len; i++)
write(buff, obj[i], kryoI);
releaseKryo(kryoI);
obj[i] = read(buff, kryoI);
releaseReadKryo(kryoI);
}


@Override
public Object read(ByteBuffer buff)
protected Object read(ByteBuffer buff, KryoInstance kryoI)
{
KryoInstance kryoI = getKryo();
var obj = read(buff, kryoI);
releaseKryo(kryoI);
Kryo kryo = kryoI.kryo;
Input input = kryoI.input;

input.setBuffer(buff.array(), buff.position(), buff.remaining());
Object obj = kryo.readClassAndObject(input);
buff.position(input.position());

return obj;
}


@Override
public void read(ByteBuffer buff, Object[] obj, int len, boolean key)
public void write(WriteBuffer buff, Object obj)
{
KryoInstance kryoI = getWriteKryo(obj);
write(buff, obj, kryoI);
}


@Override
public void write(WriteBuffer buff, Object[] objects, int len, boolean key)
{
KryoInstance kryoI = getKryo();
for (int i = 0; i < len; i++)
obj[i] = read(buff, kryoI);
releaseKryo(kryoI);
{
var obj = objects[i];
KryoInstance kryoI = getWriteKryo(obj);
write(buff, obj, kryoI);
}
}


protected void write(WriteBuffer buff, Object obj, KryoInstance kryoI)
{
initRecordSize(obj, kryoI);

Kryo kryo = kryoI.kryo;
Output output = kryoI.output;
output.setPosition(0);
Expand All @@ -204,47 +226,49 @@ protected void write(WriteBuffer buff, Object obj, KryoInstance kryoI)
kryo.writeClassAndObject(output, obj);
buff.put(output.getBuffer(), 0, output.position());

// adjust the average size using an exponential moving average
// adjust the average size
int size = output.position();
averageRecordSize = (size + averageRecordSize*4) / 5;
updateRecordSize(kryoI, obj, size);
}


protected void initRecordSize(Object obj)
/**
* Gets a key that is unique per record type
* This must be overridden if several types of records of different
* sizes are multiplexed in the same datastore (e.g. observations)
* @param obj
* @return
*/
protected long getRecordTypeKey(Object obj)
{
if (averageRecordSize <= 0)
{
KryoInstance kryoI = getKryo();
initRecordSize(obj, kryoI);
releaseKryo(kryoI);
}
return 0;
}


protected void initRecordSize(Object obj, KryoInstance kryoI)
/* Methods used to automatically compute average record size */
/* This is used to inform H2 of the approximate memory used by records */

protected int computeRecordSize(Object obj)
{
if (averageRecordSize <= 0)
{
Kryo kryo = kryoI.kryo;
Output output = kryoI.output;
output.setPosition(0);
kryo.writeClassAndObject(output, obj);
averageRecordSize = output.position();
output.setBuffer(new byte[averageRecordSize*2], maxRecordSize);
}
KryoInstance kryoI = getWriteKryo(obj);
return (int)kryoI.avgRecordSize;
}


protected Object read(ByteBuffer buff, KryoInstance kryoI)
protected void updateRecordSize(KryoInstance kryoI, Object obj, int size)
{
// adjust average record size using an exponential moving average
kryoI.avgRecordSize = (size + 15*kryoI.avgRecordSize) / 16;
}


protected void initRecordSize(KryoInstance kryoI, Object obj)
{
Kryo kryo = kryoI.kryo;
Input input = kryoI.input;

input.setBuffer(buff.array(), buff.position(), buff.remaining());
Object obj = kryo.readClassAndObject(input);
buff.position(input.position());

return obj;
Output output = kryoI.output;
output.setPosition(0);
kryo.writeClassAndObject(output, obj);
kryoI.avgRecordSize = output.position();
}


Expand Down

0 comments on commit dbf3931

Please sign in to comment.