Skip to content

Commit

Permalink
fix locks
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Jan 28, 2024
1 parent af70159 commit 9a4f92f
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 124 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.15.9'
version = '1.15.10'

repositories {
mavenCentral()
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/nucleodb/library/NucleoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,21 @@ public NucleoDB(DBType dbType, String readToTime, Consumer<ConnectionConsumer> c
}
private void startLockManager(Consumer<LockConfig> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
LockConfig config = new LockConfig();
CountDownLatch lockManagerStartupComplete = new CountDownLatch(1);
config.setStartupRun(new StartupRun(){
@Override
public void run(LockManager lockManager) {
lockManagerStartupComplete.countDown();
}
});
if(customizer!=null) customizer.accept(config);
lockManager = new LockManager(config);
new Thread(lockManager).start();
try {
lockManagerStartupComplete.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void startConnections(String[] packagesToScan, DBType dbType, String readToTime, Consumer<ConnectionConsumer> customizer) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
logger.info("NucleoDB Connections Starting");
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/nucleodb/library/database/lock/LockConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.nucleodb.library.database.lock;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.nucleodb.library.database.utils.StartupRun;
import com.nucleodb.library.mqs.config.MQSConfiguration;
import com.nucleodb.library.mqs.kafka.KafkaConfiguration;

Expand All @@ -13,6 +15,9 @@ public class LockConfig implements Serializable {
Map<String, Object> settingsMap = new TreeMap<>();
String topic = "locks";

@JsonIgnore
private transient StartupRun startupRun = null;

public LockConfig() {
settingsMap.put("partitions", 1);
settingsMap.put("replicas", 3);
Expand Down Expand Up @@ -44,4 +49,12 @@ public void setTopic(String topic) {
this.topic = topic;
this.settingsMap.put("table", topic);
}

public StartupRun getStartupRun() {
return startupRun;
}

public void setStartupRun(StartupRun startupRun) {
this.startupRun = startupRun;
}
}
214 changes: 131 additions & 83 deletions src/main/java/com/nucleodb/library/database/lock/LockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,197 +5,245 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.nucleodb.library.database.tables.table.DataTable;
import com.nucleodb.library.database.utils.Serializer;
import com.nucleodb.library.mqs.ConsumerHandler;
import com.nucleodb.library.mqs.ProducerHandler;
import org.jetbrains.annotations.NotNull;

import java.beans.IntrospectionException;
import java.lang.reflect.InvocationTargetException;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class LockManager implements Runnable{

Queue<LockReference> queue = Queues.newLinkedBlockingQueue();

@JsonIgnore
private static Logger logger = Logger.getLogger(LockManager.class.getName());

private String ownerId = UUID.randomUUID().toString();

private ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1000);

ProducerHandler producerHandler;
ConsumerHandler consumerHandler;

@Override
public void run() {
try {
while (true) {
activeLocks.cleanUp();
Thread.sleep(1000L);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

private Map<String, LockReference> pendingLocks = new TreeMap<>();

private ConcurrentMap<String, ConcurrentLinkedQueue<LockReference>> waiting = new ConcurrentHashMap<>();
private LockConfig config;
public LockManager(LockConfig lockConfig) throws IntrospectionException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
this.config = lockConfig;
consumerHandler = lockConfig.getMqsConfiguration().createConsumerHandler(lockConfig.getSettingsMap());
consumerHandler.setLockManager(this);
consumerHandler.start();
consumerHandler.start(1);
lockConfig.getSettingsMap().put("consumerHandler", this.consumerHandler);
producerHandler = lockConfig.getMqsConfiguration().createProducerHandler(lockConfig.getSettingsMap());
}

private Map<String, LockReference> pendingLocks = new TreeMap<>();

private ConcurrentMap<String, ConcurrentLinkedQueue<LockReference>> waiting = new ConcurrentHashMap<>();
public void startup() {
if (this.config.getStartupRun() != null) {
this.config.getStartupRun().run(this);
}
}

private transient Cache<String, LockReference> activeLocks = CacheBuilder.newBuilder()
.maximumSize(10000)
.softValues()
.expireAfterWrite(2, TimeUnit.SECONDS)
.removalListener(e -> {
logger.log(Level.FINER, e.getCause().toString());
logger.log(Level.FINER, (String)e.getKey());
if (e.getValue() instanceof LockReference && e.getCause()== RemovalCause.EXPIRED) {
LockReference lock = ((LockReference) e.getValue());
lock.setLock(false);
logger.log(Level.FINER,"expiring "+lock.getRequest()+" "+Instant.now().toString());
lockAction(lock);
@Override
public void run() {
scheduledExecutorService.scheduleAtFixedRate(()->{
LockReference reference;
while((reference = queue.poll())!=null){
String entryKey = String.format("%s_%s", reference.getTableName(), reference.getKey());
pendingLocks.put(reference.getRequest(), reference);
ConcurrentLinkedQueue<LockReference> waitingLocks;
synchronized (waiting) {
if(!waiting.containsKey(entryKey)) {
waitingLocks = Queues.newConcurrentLinkedQueue();
waiting.put(entryKey, waitingLocks);
}else{
waitingLocks = waiting.get(entryKey);
}
}
})
.build();
waitingLocks.add(reference);
try {
producerHandler.push(entryKey, Serializer.getObjectMapper().getOm().writeValueAsString(reference));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}, 0, 10, TimeUnit.MILLISECONDS);

}



private transient ConcurrentMap<String, LockReference> activeLocks = new ConcurrentHashMap(){
@Override
public Object put(@NotNull Object key, @NotNull Object value) {
executorPool.schedule(()->{
// LockReference lockReference = (LockReference)this.get(key);
// if(lockReference!=null && value instanceof LockReference && lockReference.getRequest().equals(((LockReference)value).getRequest())) {
// logger.info( "EXPIRED");
// logger.info((String) key);
// try {
// LockReference activeLock = Serializer.getObjectMapper().getOm().readValue(Serializer.getObjectMapper().getOm().writeValueAsString(value), LockReference.class);
// activeLock.setLock(false);
// lockAction(activeLock);
// } catch (JsonProcessingException e) {
// e.printStackTrace();
// }
// }
}, 10000, TimeUnit.MILLISECONDS);
return super.put(key, value);
}
};
public LockReference waitForLock(String table, String key) throws InterruptedException {
String entryKey = String.format("%s_%s", table, key);
LockReference lockReference = new LockReference(table, key, ownerId, true);
pendingLocks.put(lockReference.getRequest(), lockReference);
ConcurrentLinkedQueue<LockReference> lockReferences = waiting.get(entryKey);
if (lockReferences == null) {
lockReferences = Queues.newConcurrentLinkedQueue();
waiting.put(entryKey, lockReferences);
}
lockReferences.add(lockReference);
try {
producerHandler.push(entryKey, Serializer.getObjectMapper().getOm().writeValueAsString(lockReference));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
addToQueue(lockReference);
synchronized (lockReference) {
logger.log(Level.FINER,"waiting for lock");
lockReference.wait();

}
return lockReference;
}

public void releaseLock(String table, String key, String lockReference) {
logger.log(Level.FINER,"RELEASE LOCK CALLED FOR "+lockReference);

if(lockReference==null) return;
log(lockReference, "RELEASE LOCK CALLED");
if(lockReference==null) {
return;
}
String entryKey = String.format("%s_%s", table, key);
LockReference activeLock = activeLocks.getIfPresent(entryKey);
logger.log(Level.FINER, String.format("%s", entryKey));
LockReference activeLock = activeLocks.get(entryKey);
if (activeLock != null) {
logger.log(Level.FINER,"LOCK FOUND");
if(!activeLock.getRequest().equals(lockReference)){
logger.log(Level.FINER,"CURRENT LOCK NOT REQUESTED RELEASE");
if (!activeLock.getRequest().equals(lockReference)) {
return;
}
if (!activeLock.getOwner().equals(ownerId)){
logger.log(Level.FINER,"NOT OWNER NO RELEASE");
if (!activeLock.getOwner().equals(ownerId)) {
return;
}
logger.log(Level.FINER,"RELEASE LOCK");
activeLock.setLock(false);
push(activeLock);
}else{
logger.log(Level.FINER,"LOCK NOT FOUND");
try {
activeLock = Serializer.getObjectMapper().getOm().readValue(Serializer.getObjectMapper().getOm().writeValueAsString(activeLock), LockReference.class);
activeLock.setLock(false);
push(activeLock);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}

public boolean availableForLock(String table, String key) {
LockReference ifPresent = activeLocks.getIfPresent(String.format("%s_%s", table, key));
LockReference ifPresent = activeLocks.get(String.format("%s_%s", table, key));
if (ifPresent == null) return true;
if (!ifPresent.getTime().plusSeconds(1).isAfter(Instant.now())) return true;
return false;
}

public boolean hasLock(String table, String key, String request) {
LockReference ifPresent = activeLocks.getIfPresent(String.format("%s_%s", table, key));
LockReference ifPresent = activeLocks.get(String.format("%s_%s", table, key));
if (ifPresent == null) return false;
if (!ifPresent.getRequest().equals(request)) return false;
if (!ifPresent.getOwner().equals(ownerId)) return false;
if (!ifPresent.getTime().plusSeconds(1).isAfter(Instant.now())) return false;
return true;
}

void addToQueue(LockReference lockReference){
queue.add(lockReference);
}
public void push(LockReference lockReference) {
try {
String key = String.format("%s_%s", lockReference.getTableName(), lockReference.getKey());
producerHandler.push(key, Serializer.getObjectMapper().getOm().writeValueAsString(lockReference));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
e.printStackTrace();
}
}

public void lockAction(LockReference lockReference) {
String key = String.format("%s_%s", lockReference.getTableName(), lockReference.getKey());
LockReference currentActiveLock = activeLocks.getIfPresent(key);
LockReference currentActiveLock = activeLocks.get(key);

if (lockReference.isLock()) {
if (currentActiveLock != null) {
if (currentActiveLock != null && !currentActiveLock.getRequest().equals(lockReference.getRequest())) {
log(currentActiveLock.getRequest(), "active lock check time");
// do not lock, wait for lock to release
if (currentActiveLock.getTime().plusSeconds(2).isAfter(Instant.now())) return;
if (currentActiveLock.getTime().plusSeconds(2).isAfter(Instant.now())) {
log(currentActiveLock.getRequest(), "active lock check time: WAIT FOR LOCK");
return;
}
}
logger.log(Level.FINER,"lock request: "+lockReference.getRequest());
//activeLocks.invalidate(key);
activeLocks.put(key, lockReference);
if(waiting.containsKey(key)) waiting.get(key).removeIf(c -> c.getRequest().equals(lockReference.getRequest()));
LockReference lockReferenceLocalObject = pendingLocks.remove(lockReference.getRequest());
if (lockReferenceLocalObject != null) {
synchronized (lockReferenceLocalObject) {
lockReferenceLocalObject.notify();
}
}

} else {
} else { // lock is false, release current active lock

if (currentActiveLock != null) {
logger.log(Level.FINER,"lockAction: ACTIVE LOCK FOUND");
log(currentActiveLock.getRequest()+" ][ "+lockReference.getRequest(), "lockAction: ACTIVE LOCK FOUND");
if(lockReference.getRequest().equals(currentActiveLock.getRequest())) {
activeLocks.invalidate(currentActiveLock);
logger.log(Level.FINER,"lockAction: CURRENT LOCK REMOVED");
activeLocks.remove(key);
log(currentActiveLock.getRequest()+" ][ "+lockReference.getRequest(), "lockAction: CURRENT LOCK REMOVED");
}else{
logger.log(Level.FINER,"lockAction: REQUEST NOT SAME AS CURRENT LOCK");
log(currentActiveLock.getRequest()+" ][ "+lockReference.getRequest(), "lockAction: REQUEST NOT SAME AS CURRENT LOCK");
return;
}
}else{
logger.log(Level.FINER,"lockAction: NO ACTIVE LOCK");
log(lockReference.getRequest(), "lockAction: NO ACTIVE LOCK");
return;
}

ConcurrentLinkedQueue<LockReference> lockReferences = waiting.get(key);
if (lockReferences != null) {
logger.log(Level.FINER,"lockAction: LOCK REFERENCES FOR: "+key);
logger.log(Level.FINER,"lockAction: REMOVING FROM WAITING "+lockReference.getRequest());
lockReferences.removeIf(c -> c.getRequest().equals(lockReference.getRequest()));
Optional<LockReference> first = lockReferences.stream().findFirst();
if (first.isPresent()) {
logger.log(Level.FINER,"lockAction: SENDING TO LOCK "+first.get().getRequest());
push(first.get());
if (lockReferences != null && lockReference.getRequest().equals(currentActiveLock.getRequest())) {
log(lockReference.getRequest(), "lockAction: GETTING WAITING LOCK REQUESTS FOR: "+key);

Iterator<LockReference> iterator = lockReferences.iterator();
if (iterator.hasNext()) {
LockReference first = iterator.next();
log(first.getRequest(), "lockAction: SENDING TO WAITING LOCK ");
push(first);
} else {
logger.log(Level.FINER,"lockAction: NO NEW WAITING LOCKS");
log(key, "lockAction: NO NEW WAITING LOCKS");
//waiting.remove(key);
//activeLocks.invalidate(key);
//activeLocks.remove(key);
}
}

}
}

void log(String key, String msg){
logger.log(Level.FINE,String.format("%s: %s", key, msg));
}

public ConcurrentMap<String, LockReference> getActiveLocks() {
return activeLocks;
}

public void setActiveLocks(ConcurrentMap<String, LockReference> activeLocks) {
this.activeLocks = activeLocks;
}
}
Loading

0 comments on commit 9a4f92f

Please sign in to comment.