Skip to content

Commit

Permalink
add direct fromtolabel fromlabel connection lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
firestar committed Oct 26, 2023
1 parent 03a52f2 commit f0663b7
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/groovy/nucleodb.app.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.7.2'
version = '1.7.3'

repositories {
mavenCentral()
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/groovy/nucleodb.library.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

group = 'com.nucleodb'
version = '1.9.2'
version = '1.9.3'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ConnectionHandler implements Serializable{
private Map<Integer, Long> partitionOffsets = new TreeMap<>();

private Set<Connection> allConnections = new TreeSetExt<>();

private transient NucleoDB nucleoDB;

private ConnectionConfig config;
Expand Down Expand Up @@ -157,9 +158,9 @@ public Stream<Connection> getStream(DataEntry de){
}

public Set<Connection> getByLabel(DataEntry de, String label){
Set<Connection> tmp = connections.get(de.getKey());
Set<Connection> tmp = connections.get(de.getKey()+label);
if(tmp!=null) {
return tmp.stream().filter(c->c.getLabel().equals(label)).map(c->c.clone()).collect(Collectors.toSet());
return tmp.stream().map(c->c.clone()).collect(Collectors.toSet());
}
return null;
}
Expand All @@ -170,14 +171,36 @@ public Stream<Connection> getByLabelStream(DataEntry de, String label){
}
return null;
}
private void addConnection(Connection connection){
connection.connectionHandler = this;
connectionByUUID.put(connection.getUuid(), connection);
String key = connection.getFromKey();

public Set<Connection> getByLabelTo(DataEntry de, String label, DataEntry toDe){
Set<Connection> tmp = connections.get(de.getKey()+toDe.getKey()+label);
if(tmp!=null) {
return tmp.stream().map(c->c.clone()).collect(Collectors.toSet());
}
return null;
}
public Stream<Connection> getByLabelToStream(DataEntry de, String label,DataEntry toDe){
Set<Connection> tmp = getByLabelTo(de, label, toDe);
if(tmp!=null){
return tmp.stream();
}
return null;
}

private void putConnectionInKey(String key, Connection connection){
if(!connections.containsKey(key)){
connections.put(key, new TreeSetExt<>());
}
connections.get(key).add(connection);
}

private void addConnection(Connection connection){
connection.connectionHandler = this;
connectionByUUID.put(connection.getUuid(), connection);
String connectionKey = connection.getFromKey();
this.putConnectionInKey(connectionKey, connection);
this.putConnectionInKey(connection.getFromKey()+connection.getLabel(), connection);
this.putConnectionInKey(connection.getFromKey()+connection.getToKey()+connection.getLabel(), connection);
allConnections.add(connection);
}
private void removeConnection(Connection connection){
Expand Down Expand Up @@ -293,9 +316,7 @@ private boolean saveInternalConsumerSync(Connection connection) throws Interrupt
}
String changeUUID = UUID.randomUUID().toString();
CountDownLatch countDownLatch = new CountDownLatch(1);
consumers.put(changeUUID, (conn) -> {
countDownLatch.countDown();
});
consumers.put(changeUUID, (conn) -> countDownLatch.countDown());
if (saveInternalSync(connection, changeUUID)) {
countDownLatch.await();
return true;
Expand Down

0 comments on commit f0663b7

Please sign in to comment.