From c5544f56a08578b636a62448d41e5654c1de598a Mon Sep 17 00:00:00 2001 From: Miratepuffin Date: Tue, 8 Dec 2020 21:32:20 +0000 Subject: [PATCH] Shrank the Jar. Moved Bitcoin to Examples project --- mainproject/build.sbt | 36 +++---- .../scala/com/raphtory/api/S3Serialiser.scala | 35 ------ .../Workers/IngestionWorker.scala | 6 +- .../spouts/blockchain/BitcoinNodeSpout.scala | 79 -------------- .../spouts/blockchain/EthereumGethSpout.scala | 100 ------------------ .../blockchain/EthereumPostgresSpout.scala | 57 ---------- .../src/tests/scala/lotr/LOTRDeployment.scala | 3 +- .../tests/scala/lotr/LOTRGraphBuilder.scala | 2 +- .../src/tests/scala/lotr/LOTRSpout.scala | 2 +- 9 files changed, 24 insertions(+), 296 deletions(-) delete mode 100644 mainproject/src/main/scala/com/raphtory/api/S3Serialiser.scala delete mode 100644 mainproject/src/main/scala/com/raphtory/spouts/blockchain/BitcoinNodeSpout.scala delete mode 100644 mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumGethSpout.scala delete mode 100644 mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumPostgresSpout.scala diff --git a/mainproject/build.sbt b/mainproject/build.sbt index b5c07d8646..f1d43d863d 100644 --- a/mainproject/build.sbt +++ b/mainproject/build.sbt @@ -55,23 +55,23 @@ val kamon_prometheus = "io.kamon" %% "kamon-prometheus" % "2.1.0" val kamon_akka = "io.kamon" %% "kamon-akka" % "2.1.0" val kamon_system = "io.kamon" %% "kamon-system-metrics" % "2.1.0" val kamon_netty = "io.kamon" %% "kamon-netty" % "1.0.0" -val monix = "io.monix" %% "monix" % "3.0.0-RC1" +//val monix = "io.monix" %% "monix" % "3.0.0-RC1" val mongo = "org.mongodb" % "mongo-java-driver" % "3.12.4" val casbah = "org.mongodb" %% "casbah-core" % "3.1.1" -val doobie = "org.tpolecat" %% "doobie-core" % "0.8.4" -val doobiepostgres = - "org.tpolecat" %% "doobie-postgres" % "0.8.4" // Postgres driver 42.2.8 + type mappings. +//val doobie = "org.tpolecat" %% "doobie-core" % "0.8.4" +//val doobiepostgres = +// "org.tpolecat" %% "doobie-postgres" % "0.8.4" // Postgres driver 42.2.8 + type mappings. val lift = "net.liftweb" %% "lift-json" % "3.3.0" -val bitcoin = "org.scalaj" %% "scalaj-http" % "2.3.0" -val twitter_eval = "com.twitter" %% "util-eval" % "6.43.0" +//val bitcoin = "org.scalaj" %% "scalaj-http" % "2.3.0" +//val twitter_eval = "com.twitter" %% "util-eval" % "6.43.0" val hadoop = "org.apache.hadoop" % "hadoop-client" % "3.3.0" // https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -val aws = "com.amazonaws" % "aws-java-sdk" % "1.11.897" +//val aws = "com.amazonaws" % "aws-java-sdk" % "1.11.897" val parquet = "com.github.mjakubowski84" %% "parquet4s-core" % "1.6.0" -val h3 = "com.uber" % "h3" % "3.6.4" +//val h3 = "com.uber" % "h3" % "3.6.4" val IP = java.net.InetAddress.getLocalHost.getHostAddress @@ -129,9 +129,9 @@ lazy val mergeStrategy: String => MergeStrategy = { case _ => MergeStrategy.first } -lazy val root = Project(id = "raphtory", base = file(".")) aggregate (cluster) +lazy val root = Project(id = "raphtory", base = file(".")) aggregate (raphtory) -lazy val cluster = project +lazy val raphtory = project .in(file(".")) .enablePlugins(JavaAppPackaging) .enablePlugins(AshScriptPlugin) @@ -172,22 +172,22 @@ lazy val cluster = project kamon_akka, kamon_prometheus, kamon_system, - monix, - bitcoin, - twitter_eval, + //monix, + //bitcoin, + //twitter_eval, lift, apacheLang, kafka, kafkac, - doobie, - doobiepostgres, + //doobie, + //doobiepostgres, joda, casbah, mongo, - aws, + //aws, parquet, - hadoop, - h3 + hadoop + // h3 ) ) .settings( diff --git a/mainproject/src/main/scala/com/raphtory/api/S3Serialiser.scala b/mainproject/src/main/scala/com/raphtory/api/S3Serialiser.scala deleted file mode 100644 index b03c4a5c70..0000000000 --- a/mainproject/src/main/scala/com/raphtory/api/S3Serialiser.scala +++ /dev/null @@ -1,35 +0,0 @@ -package com.raphtory.api - -import java.io.{BufferedWriter, File, FileWriter} - -import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} -import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} - -abstract class S3Serialiser extends Serialiser { - val AWS_ACCESS_KEY = s"${sys.env.getOrElse("AWS_ACCESS_KEY", "")}" - val AWS_SECRET_KEY = s"${sys.env.getOrElse("AWS_SECRET_KEY", "")}" - val bucketName = s"${sys.env.getOrElse("AWS_BUCKET_NAME", "")}" - val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY) - //val amazonS3Client = new AmazonS3Client(yourAWSCredentials) - - - val s3Client: AmazonS3 = AmazonS3ClientBuilder.standard.withCredentials(new AWSStaticCredentialsProvider(yourAWSCredentials)).build - //s3Client.setRegion(com.amazonaws.regions.Region.getRegion(Regions.EU_WEST_2)) - try{s3Client.createBucket(bucketName)}catch {case e:Exception =>} - - override def write(serialisedResults:(Array[String],Array[String]),file:File) = { - println("vertices & edges", serialisedResults._1.length, serialisedResults._2.length) - val bw = new BufferedWriter(new FileWriter(file)) - bw.write(startOfFile()) - bw.write(serialisedResults._1.mkString(rowDelimeter())) - bw.write(middleOfFile()) - bw.write(serialisedResults._2.mkString(rowDelimeter())) - bw.write(endOfFile()) - bw.newLine() - bw.close() - new Thread(() => { - s3Client.putObject(bucketName, file.getName,file) - file.delete() - }).start() - } -} diff --git a/mainproject/src/main/scala/com/raphtory/core/actors/PartitionManager/Workers/IngestionWorker.scala b/mainproject/src/main/scala/com/raphtory/core/actors/PartitionManager/Workers/IngestionWorker.scala index 4d31699e91..64cd3c56fc 100644 --- a/mainproject/src/main/scala/com/raphtory/core/actors/PartitionManager/Workers/IngestionWorker.scala +++ b/mainproject/src/main/scala/com/raphtory/core/actors/PartitionManager/Workers/IngestionWorker.scala @@ -300,11 +300,11 @@ class IngestionWorker(workerId: Int,partitionID:Int, storage: EntityStorage) ext currentSafePoint = messageQueue.dequeue() else if (messageQueue.head.routerEpoch == currentSafePoint.routerEpoch) currentSafePoint = messageQueue.dequeue() - else { - currentSafePoint = messageQueue.dequeue() + //else { + // currentSafePoint = messageQueue.dequeue() //println(s"$increments Writer Worker $partitionID $workerId --- ${safeMessageMap.get(routerName).get} $currentSafePoint ${messageQueue.head.routerEpoch}") //break - } + //} } safeMessageMap put(routerName, currentSafePoint) currentSafePoint diff --git a/mainproject/src/main/scala/com/raphtory/spouts/blockchain/BitcoinNodeSpout.scala b/mainproject/src/main/scala/com/raphtory/spouts/blockchain/BitcoinNodeSpout.scala deleted file mode 100644 index c1eb4a9f6f..0000000000 --- a/mainproject/src/main/scala/com/raphtory/spouts/blockchain/BitcoinNodeSpout.scala +++ /dev/null @@ -1,79 +0,0 @@ -package com.raphtory.spouts.blockchain -// -import java.io.{File, PrintWriter} - -import com.raphtory.core.actors.Spout.Spout -import scalaj.http.{Http, HttpRequest} -import spray.json._ - -import scala.collection.mutable -import scala.language.postfixOps -import scala.sys.process._ -// -case class BitcoinTransaction(time: JsValue, block: Int, blockID: JsValue, transaction: JsValue) - -class BitcoinNodeSpout extends Spout[BitcoinTransaction]{ - - var blockcount = 1 - val rpcuser = System.getenv().getOrDefault("BITCOIN_USERNAME", "").trim - val rpcpassword = System.getenv().getOrDefault("BITCOIN_PASSWORD", "").trim - val serverAddress = System.getenv().getOrDefault("BITCOIN_NODE", "").trim - val id = "scala-jsonrpc" - val baseRequest = Http(serverAddress).auth(rpcuser, rpcpassword).header("content-type", "text/plain") - - override def setupDataSource(): Unit = {} - override def closeDataSource(): Unit = {} - - val queue = mutable.Queue[Option[BitcoinTransaction]]() - - override def generateData(): Option[BitcoinTransaction] = { - if(queue.isEmpty) - getTransactions() - queue.dequeue() - } - - def getTransactions(): Unit = { - try { - - val re = request("getblockhash", blockcount.toString).execute().body.toString.parseJson.asJsObject - val blockID = re.fields("result") - val blockData = request("getblock", s"$blockID,2").execute().body.toString.parseJson.asJsObject - val result = blockData.fields("result") - val time = result.asJsObject.fields("time") - for (transaction <- result.asJsObject().fields("tx").asInstanceOf[JsArray].elements) - queue += Some(BitcoinTransaction(time, blockcount, blockID, transaction)) - //val time = transaction.asJsObject.fields("time") - blockcount += 1 - } catch { - case e: java.net.SocketTimeoutException => queue += None - } - } - - //************* MESSAGE HANDLING BLOCK - def handleDomainMessage(): Unit = { - - } - - def outputScript() = { - val pw = new PrintWriter(new File("bitcoin.sh")) - pw.write("""curl --user $1:$2 --data-binary $3 -H 'content-type: text/plain;' $4""") - pw.close - "chmod 777 bitcoin.sh" ! - } - - def curlRequest(command: String, params: String): String = { - //val data = """{"jsonrpc":"1.0","id":"scala-jsonrpc","method":"getblockhash","params":[2]}""" - val data = s"""{"jsonrpc":"1.0","id":"$id","method":"$command","params":[$params]}""" - s"bash bitcoin.sh $rpcuser $rpcpassword $data $serverAddress" !! - } - - def request(command: String, params: String = ""): HttpRequest = - baseRequest.postData(s"""{"jsonrpc": "1.0", "id":"$id", "method": "$command", "params": [$params] }""") - - - - -} - - -//def request(command: String, params: String = ""): HttpRequest = baseRequest.postData(s"""{"jsonrpc": "1.0", "id":"$id", "method": "$command", "params": [$params] }""") diff --git a/mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumGethSpout.scala b/mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumGethSpout.scala deleted file mode 100644 index afd0cd63f0..0000000000 --- a/mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumGethSpout.scala +++ /dev/null @@ -1,100 +0,0 @@ -package com.raphtory.spouts.blockchain - -import java.net.InetAddress -import java.util.NoSuchElementException - -import com.raphtory.core.actors.Spout.Spout -import scalaj.http.{Http, HttpRequest} -import spray.json.DefaultJsonProtocol._ -import spray.json._ - -import scala.collection.mutable -import scala.language.postfixOps - - - -class EthereumGethSpout extends Spout[String] { - - var currentBlock = System.getenv().getOrDefault("SPOUT_ETHEREUM_START_BLOCK_INDEX", "9014194").trim.toInt - var highestBlock = System.getenv().getOrDefault("SPOUT_ETHEREUM_MAXIMUM_BLOCK_INDEX", "10026447").trim.toInt - val nodeIP = System.getenv().getOrDefault("SPOUT_ETHEREUM_IP_ADDRESS", "127.0.0.1").trim - val nodePort = System.getenv().getOrDefault("SPOUT_ETHEREUM_PORT", "8545").trim -// val nodePort = System.getenv().getOrDefault("SPOUT_ETHEREUM_PORT", "30303").trim -def IPRegex = - "\\b(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\b" - print(currentBlock) - - val queue = mutable.Queue[Option[String]]() - - val baseRequest = requestBuilder() - - override def setupDataSource(): Unit = {} - override def closeDataSource(): Unit = {} - override def generateData(): Option[String] = { - if(queue.isEmpty) - pullNextBlock() - queue.dequeue() - } - - - - - - implicit val EthFormat = jsonFormat14(EthResult) - implicit val EthTransactionFormat = jsonFormat3(EthTransaction) - if (nodeIP.matches(IPRegex)) - println(s"Connecting to Ethereum RPC \n Address:$nodeIP \n Port:$nodePort") - else - println(s"Connecting to Ethereum RPC \n Address:${hostname2Ip(nodeIP)} \n Port:$nodePort") - - - def pullNextBlock(): Unit = { - if (currentBlock > highestBlock) - return - try { - val transactionCountHex = executeRequest("eth_getBlockTransactionCountByNumber", "\"0x" + currentBlock.toHexString + "\""); - val transactionCount = Integer.parseInt(transactionCountHex.fields("result").toString().drop(3).dropRight(1), 16) - if(transactionCount>0){ - var transactions = "[" - for (i <- 0 until transactionCount) - transactions = transactions + batchRequestBuilder("eth_getTransactionByBlockNumberAndIndex",s""""0x${currentBlock.toHexString}","0x${i.toHexString}"""")+"," - val trasnactionBlock = executeBatchRequest(transactions.dropRight(1)+"]") - val transList = trasnactionBlock.parseJson.convertTo[List[EthTransaction]] - transList.foreach(t => { //try needed to ignore contracts //todo include them - try{queue +=(Some(s"${t.result.blockNumber.get},${t.result.from.get},${t.result.to.get},${t.result.value.get}"))} - catch {case e:NoSuchElementException =>} - - }) - - } - currentBlock += 1 - } catch { - case e: NumberFormatException => - case e: Exception => e.printStackTrace(); - } - } - - - def batchRequestBuilder(command:String,params:String):String = s"""{"jsonrpc": "2.0", "id":"100", "method": "$command", "params": [$params]}""" - def executeBatchRequest(data: String) = requestBatch(data).execute().body.toString - def requestBatch(data: String): HttpRequest = baseRequest.postData(data) - def requestBuilder() = - if (nodeIP.matches(IPRegex)) - Http("http://" + nodeIP + ":" + nodePort).header("content-type", "application/json") - else - Http("http://" + hostname2Ip(nodeIP) + ":" + nodePort).header("content-type", "application/json") - def request(command: String, params: String = ""): HttpRequest = - baseRequest.postData(s"""{"jsonrpc": "2.0", "id":"100", "method": "$command", "params": [$params]}""") - - def executeRequest(command: String, params: String = "") = { - request(command, params).execute().body.toString.parseJson.asJsObject - } - - def hostname2Ip(hostname: String): String = InetAddress.getByName(hostname).getHostAddress() - - -} - -case class EthResult(blockHash:Option[String],blockNumber:Option[String],from:Option[String],gas:Option[String],gasPrice:Option[String],hash:Option[String],input:Option[String],nonce:Option[String],r:Option[String],s:Option[String],to:Option[String],transactionIndex:Option[String],v:Option[String],value:Option[String]) -case class EthTransaction(id:Option[String],jsonrpc:Option[String],result:EthResult) - diff --git a/mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumPostgresSpout.scala b/mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumPostgresSpout.scala deleted file mode 100644 index 01f92d7382..0000000000 --- a/mainproject/src/main/scala/com/raphtory/spouts/blockchain/EthereumPostgresSpout.scala +++ /dev/null @@ -1,57 +0,0 @@ -package com.raphtory.spouts.blockchain - -import cats.effect.{Blocker, IO} -import com.raphtory.core.actors.Spout.Spout -import doobie.implicits._ -import doobie.util.ExecutionContexts -import doobie.util.transactor.Transactor - -import scala.collection.mutable - -class EthereumPostgresSpout extends Spout[String]{ - var startBlock = System.getenv().getOrDefault("STARTING_BLOCK", "46147").trim.toInt //first block to have a transaction by default - val batchSize = System.getenv().getOrDefault("BLOCK_BATCH_SIZE", "100").trim.toInt //number of blocks to pull each query - val maxblock = System.getenv().getOrDefault("MAX_BLOCK", "8828337").trim.toInt //Maximum block in database to stop querying once this is reached - - val dbURL = System.getenv().getOrDefault("DB_URL", "jdbc:postgresql:ether").trim //db connection string, default is for local with db called ether - val dbUSER = System.getenv().getOrDefault("DB_USER", "postgres").trim //db user defaults to postgres - val dbPASSWORD = System.getenv().getOrDefault("DB_PASSWORD", "").trim //default no password - - // querying done with doobie wrapper for JDBC (https://tpolecat.github.io/doobie/) - implicit val cs = IO.contextShift(ExecutionContexts.synchronous) - val dbconnector = Transactor.fromDriverManager[IO]( - "org.postgresql.Driver", - dbURL, - dbUSER, - dbPASSWORD, - Blocker.liftExecutionContext(ExecutionContexts.synchronous) - ) - val queue = mutable.Queue[Option[String]]() - - override def generateData(): Option[String] = { - if(queue isEmpty) - pullBlocks() - queue.dequeue() - } - - protected def pullBlocks(): Unit = { - sql"select from_address, to_address, value,block_timestamp from transactions where block_number >= $startBlock AND block_number < ${startBlock + batchSize} " - .query[ - (String, String, String, String) - ] //get the to,from,value and time for transactions within the set block batch - .to[List] // ConnectionIO[List[String]] - .transact(dbconnector) // IO[List[String]] - .unsafeRunSync // List[String] - .foreach(x => queue+=(Some(x.toString()))) //send each transaction to the routers - - startBlock += batchSize//increment batch for the next query - if (startBlock <= maxblock) - dataSourceComplete()//if we have reached the max block we stop querying the database - - - - } - - override def setupDataSource(): Unit = {} - override def closeDataSource(): Unit = {} -} diff --git a/mainproject/src/tests/scala/lotr/LOTRDeployment.scala b/mainproject/src/tests/scala/lotr/LOTRDeployment.scala index 72e3b495d2..e83ec4d173 100644 --- a/mainproject/src/tests/scala/lotr/LOTRDeployment.scala +++ b/mainproject/src/tests/scala/lotr/LOTRDeployment.scala @@ -1,5 +1,4 @@ -package examples.lotr - +package lotr import com.raphtory.RaphtoryGraph import com.raphtory.algorithms.{ConnectedComponents, DegreeBasic} diff --git a/mainproject/src/tests/scala/lotr/LOTRGraphBuilder.scala b/mainproject/src/tests/scala/lotr/LOTRGraphBuilder.scala index 3775bfa265..9e8b82de7b 100644 --- a/mainproject/src/tests/scala/lotr/LOTRGraphBuilder.scala +++ b/mainproject/src/tests/scala/lotr/LOTRGraphBuilder.scala @@ -1,4 +1,4 @@ -package examples.lotr +package lotr import com.raphtory.core.actors.Router.GraphBuilder import com.raphtory.core.model.communication._ diff --git a/mainproject/src/tests/scala/lotr/LOTRSpout.scala b/mainproject/src/tests/scala/lotr/LOTRSpout.scala index 7e7f8d245d..3e0bd59de1 100644 --- a/mainproject/src/tests/scala/lotr/LOTRSpout.scala +++ b/mainproject/src/tests/scala/lotr/LOTRSpout.scala @@ -1,4 +1,4 @@ -package examples.lotr +package lotr import com.raphtory.core.actors.Spout.Spout