Skip to content

Commit

Permalink
Merge pull request #13 from ao/develop
Browse files Browse the repository at this point in the history
Wips
  • Loading branch information
ao authored Apr 21, 2021
2 parents a1a6d38 + 01179b4 commit a232964
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 111 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM openjdk:8
COPY ./out/production/ADD/ /tmp
COPY ./target/Serengeti-1.2-SNAPSHOT-jar-with-dependencies.jar /tmp
WORKDIR /tmp
EXPOSE 1985
ENTRYPOINT ["java","gl.ao.serengeti.Serengeti"]
#ENTRYPOINT ["java","gl.ao.serengeti.Serengeti"]
CMD java -jar Serengeti-1.2-SNAPSHOT-jar-with-dependencies.jar
Empty file removed data/.gitkeep
Empty file.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<scm>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/gl/ao/serengeti/data/DatabaseObjectData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package gl.ao.serengeti.data;

public class DatabaseObjectData {

private byte[] data;

public DatabaseObjectData() {}
public DatabaseObjectData(byte[] data) {
this.data = data;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
this.data = data;
}
}
16 changes: 4 additions & 12 deletions src/main/java/gl/ao/serengeti/helpers/Globals.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ public class Globals {
public static String index_filename = "index.file";

public static int port_default = 1985;
public static int port_communication = 19851;

public static int max_cluster_nodes = 3;

/***
* Convert to Bytes
Expand Down Expand Up @@ -184,18 +181,13 @@ public static String getHost4Address() throws SocketException {
* @param path
*/
static public void deleteDirectory(File path) {
if (path == null)
return;
if (path == null) return;
if (path.exists()) {
for(File f : Objects.requireNonNull(path.listFiles())) {
if(f.isDirectory()) {
deleteDirectory(f);
f.delete();
} else {
f.delete();
}
if (f.isDirectory()) deleteDirectory(f);
f.delete();
}
path.delete();
boolean delete = path.delete();
}
}

Expand Down
63 changes: 29 additions & 34 deletions src/main/java/gl/ao/serengeti/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void requestNetworkMetas() {
JSONObject jsonObject = node.getValue();

try {
HttpURLConnection con = (HttpURLConnection) new URL("http://" + jsonObject.getString("ip") + ":" + Globals.port_default + "/meta").openConnection();
HttpURLConnection con = (HttpURLConnection) new URL(
String.format("http://%s:%d/meta", jsonObject.getString("ip"), Globals.port_default))
.openConnection();
con.setRequestMethod("GET");
con.getDoOutput();
con.setConnectTimeout(networkTimeout);
Expand All @@ -87,35 +89,41 @@ public void requestNetworkMetas() {
JSONArray jsonArr = (JSONArray) jsonMeta.get(db);

if (!Serengeti.storage.databaseExists(db)) {
System.out.println("Startup: Creating missing database '"+db+"'");
System.out.printf("Startup: Creating missing database '%s'%n", db);
Serengeti.storage.createDatabase(db, true);
changesFound++;
}

for (int i = 0; i < jsonArr.length(); i++) {
String table = jsonArr.getString(i);
if (!Serengeti.storage.tableExists(db, table)) {
System.out.println("Startup: Creating missing table '"+table+"' for database '"+db+"'");
System.out.printf("Startup: Creating missing table '%s' for database '%s'%n", table, db);
Serengeti.storage.createTable(db, table, true);
changesFound++;

String row_replicas = Serengeti.network.communicateQueryLogSingleNode( jsonObject.getString("id"), jsonObject.getString("ip"), new JSONObject(){{
put("type", "SendTableReplicaToNode");
put("db", db);
put("table", table);
put("node_id", Serengeti.server.server_constants.id);
put("node_ip", Serengeti.network.myIP);
}}.toString() );
String row_replicas = Serengeti.network.communicateQueryLogSingleNode(
jsonObject.getString("id"),
jsonObject.getString("ip"),
new JSONObject() {{
put("type", "SendTableReplicaToNode");
put("db", db);
put("table", table);
put("node_id", Serengeti.server.server_constants.id);
put("node_ip", Serengeti.network.myIP);
}}.toString()
);

try {
JSONObject jsonRowsReplica = new JSONObject(row_replicas);
Iterator<String> jkeys = jsonRowsReplica.keys();
while (jkeys.hasNext()) {
String jrow_id = jkeys.next();
JSONObject _json = new JSONObject(jsonRowsReplica.getString(jrow_id));
Storage.tableReplicaObjects.get(db+"#"+table).insertOrReplace(jrow_id, _json);
Storage.tableReplicaObjects
.get(db+"#"+table)
.insertOrReplace(jrow_id, _json);
}
} catch (Exception e) {}
} catch (Exception ignore) {}
}
}
}
Expand All @@ -125,15 +133,14 @@ public void requestNetworkMetas() {
}



} catch (Exception e) {
e.printStackTrace();
}

}
}
online = true;
System.out.println("Startup: Completed with "+changesFound+" changes found");
System.out.printf("Startup: Completed with %d changes found%n", changesFound);
Serengeti.server.serve();
}).start();
}
Expand Down Expand Up @@ -207,9 +214,8 @@ public void getNetworkIPsPorts() {
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuilder content = new StringBuilder();
while ((inputLine = in.readLine()) != null) {
while ((inputLine = in.readLine()) != null)
content.append(inputLine);
}

JSONObject jsonObj = new JSONObject(content.toString());
JSONObject nodeJSON = (JSONObject) jsonObj.get("this");
Expand All @@ -225,9 +231,8 @@ public void getNetworkIPsPorts() {

if (latencyRun) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime> latency) {
if (elapsedTime > latency)
latency = elapsedTime;
}
}


Expand Down Expand Up @@ -327,23 +332,22 @@ public String communicateQueryLogSingleNode(String id, String ip, String jsonStr

StringBuilder response = new StringBuilder();
try {
URL url2 = new URL("http://" + ip + ":" + Globals.port_default + "/post");
URL url2 = new URL(String.format("http://%s:%d/post", ip, Globals.port_default));
HttpURLConnection con2 = (HttpURLConnection) url2.openConnection();
con2.setRequestMethod("POST");
con2.setDoOutput(true);
con2.setConnectTimeout(networkTimeout);
con2.getOutputStream().write(jsonString.getBytes(StandardCharsets.UTF_8));

BufferedReader br = new BufferedReader(new InputStreamReader(con2.getInputStream()));
for (String line = br.readLine(); line != null; line = br.readLine()) {
for (String line = br.readLine(); line != null; line = br.readLine())
response.append(line);
}
return response.toString();
} catch (SocketException se) {
//System.out.println("Socket Exception (communicateQueryLogSingleNode): " + se.getMessage());
return "";
} catch (IOException ioe) {
System.out.println("IOException: " + ioe.getMessage()+ " > Tried: Communicating to " + ip + ": " + jsonString );
System.out.printf("IOException: %s > Tried: Communicating to %s: %s%n", ioe.getMessage(), ip, jsonString);
return "";
} catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -415,20 +419,13 @@ public JSONObject getRandomAvailableNode() {

if (an.size() > 0) {
// Should first remove `self` from the list
Iterator it=an.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<String, JSONObject> item = (Map.Entry<String, JSONObject>) it.next();
if (item.getValue().get("ip").toString().equals(myIP)) {
it.remove();
}
}
an.entrySet().removeIf(item -> item.getValue().get("ip").toString().equals(myIP));

if (an.size()==0) return null; // this can happen when there was only 1 node in the list and it was ourselves!

List<JSONObject> list = new ArrayList<JSONObject>();
for (String key: an.keySet()) {
for (String key: an.keySet())
list.add(an.get(key));
}

// Shuffle and send the first one back
Collections.shuffle(list);
Expand Down Expand Up @@ -463,10 +460,8 @@ public void getNetworkIPsOnly() {
System.out.println("Not Reachable: "+output);
}



} catch (Exception e) {
System.out.println(ip);
System.out.println(Arrays.toString(ip));
e.printStackTrace();
}
}).start();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/gl/ao/serengeti/query/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class QueryEngine {
* @param query
* @return List
*/
public static List query(String query) {
public static List<JSONObject> query(String query) {
if (query.trim().equals("")) {
return null;
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/gl/ao/serengeti/schema/DatabaseObject.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gl.ao.serengeti.schema;

import gl.ao.serengeti.data.DatabaseObjectData;
import gl.ao.serengeti.helpers.Globals;

import java.io.*;
Expand All @@ -15,6 +16,7 @@ public class DatabaseObject implements Serializable {
public String name = "";

public List<String> tables = new ArrayList<>();
// public DatabaseObjectData tables2 = new DatabaseObjectData();

public DatabaseObject() {}

Expand Down
22 changes: 8 additions & 14 deletions src/main/java/gl/ao/serengeti/schema/TableStorageObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ public String insert(String row_id, JSONObject json) {
rows.put(row_id, json.toString());
return row_id;
}
public boolean update(String row_id, JSONObject json) {
public void update(String row_id, JSONObject json) {
rows.replace(row_id, json.toString());
return true;
}
public boolean update(String update_key, String update_val, String where_col, String where_val) {
List<String> results = select(where_col, where_val);
Expand All @@ -61,9 +60,8 @@ public boolean update(String update_key, String update_val, String where_col, St

return false;
}
public boolean delete(String row_id) {
public void delete(String row_id) {
rows.remove(row_id);
return true;
}
public boolean delete(String where_col, String where_val) {
List<String> results = select(where_col, where_val);
Expand All @@ -78,7 +76,7 @@ public boolean delete(String where_col, String where_val) {

return false;
}
public List select(String col, String val) {
public List<String> select(String col, String val) {
List<String> ret = new ArrayList<>();

for (String key: rows.keySet()) {
Expand Down Expand Up @@ -113,24 +111,20 @@ public byte[] returnDBObytes() {
return Globals.convertToBytes(this);
}

public boolean saveToDisk() {
public void saveToDisk() {
ObjectOutputStream oos = null;
try {
FileOutputStream fos = new FileOutputStream(Globals.data_path + databaseName + "/" + tableName + "/" + Globals.storage_filename);
oos = new ObjectOutputStream(fos);
oos.writeObject(this);
return true;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (oos != null) {
try {
oos.close();
} catch (IOException ex) {
ex.printStackTrace();
}
if (oos != null) try {
oos.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
return false;
}
}
Loading

0 comments on commit a232964

Please sign in to comment.