From 9dd9832e21d736a211084e402295417b68b5f3ba Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 1 Aug 2019 10:24:45 -0400 Subject: [PATCH 01/24] Add rail-related tracking features --- .../oneoffs/ChangesetORCCreator.scala | 175 ++++++++++++++++++ .../oneoffs/ChangesetStatsCreator.scala | 10 +- .../osmesa/analytics/stats/package.scala | 21 ++- 3 files changed, 199 insertions(+), 7 deletions(-) create mode 100644 src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala new file mode 100644 index 00000000..60b68555 --- /dev/null +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala @@ -0,0 +1,175 @@ +package osmesa.analytics.oneoffs + +import cats.data.{Validated, ValidatedNel} +import cats.implicits._ +import com.monovore.decline._ +import io.circe.generic.auto._ +import io.circe.{yaml, _} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import osmesa.analytics.Analytics +import vectorpipe.sources.{ChangesetSource, Source} + +import java.net.URI +import java.sql.Timestamp +import java.time.Instant +import scalaj.http.Http + +/* + * Usage example: + * + * sbt "project analytics" assembly + * + * spark-submit \ + * --class osmesa.analytics.oneoffs.ChangesetORCUpdater \ + * ingest/target/scala-2.11/osmesa-analytics.jar \ + * --changeset-source http://somewhere/diffs/ \ + * --database-url $DATABASE_URL + */ +object ChangesetORCUpdater + extends CommandApp( + name = "osmesa-changeset-stats-updater", + header = "Update statistics from augmented diffs", + main = { + + import ChangesetORCUpdaterUtils._ + + val changesetSourceOpt = + Opts + .option[URI]( + "changesets", + short = "c", + metavar = "uri", + help = "Location of replication changesets" + ) + .validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") } + + val endTimeOpt = + Opts + .option[Instant]("end-time", + short = "e", + metavar = "timestamp", + help = "Timestamp of stream end (of the form 2016-02-29T13:45:00Z); if absent, the time now will be used") + .orNone + + val orcArg = Opts + .argument[URI]("source ORC") + .validate("URI to ORC must have an s3 or file scheme") { _.getScheme != null } + .validate("orc must be an S3 or file Uri") { uri => + uri.getScheme.startsWith("s3") || uri.getScheme.startsWith("file") + } + .validate("orc must be an .orc file") { _.getPath.endsWith(".orc") } + + val outputArg = Opts.argument[URI]("destination ORC") + .validate("Output URI must have a scheme") { _.getScheme != null } + .validate("Output URI must have an S3 or file scheme") { uri => + uri.getScheme.startsWith("s3") || uri.getScheme.startsWith("file") + } + .validate("orc must be an .orc file") { _.getPath.endsWith(".orc") } + + (changesetSourceOpt, + endTimeOpt, + orcArg, + outputArg).mapN { + (changesetSource, endTime, orcUri, outputURI) => + implicit val spark: SparkSession = Analytics.sparkSession("ChangesetStatsUpdater") + + import spark.implicits._ + + val df = spark.read.orc(orcUri.toString) + val lastModified = df.select(max(coalesce('closed_at, 'created_at))).first.getAs[Timestamp](0) + + val startSequence = findSequenceFor(lastModified.toInstant, changesetSource) + val endSequence = endTime.map(findSequenceFor(_, changesetSource)).getOrElse(getCurrentSequence(changesetSource).sequence) + + val options = Map( + Source.BaseURI -> changesetSource.toString, + Source.StartSequence -> startSequence.toString, + Source.EndSequence -> (endSequence + 1).toString // sequence range is (]; end sequence is exclusive + ) + + val changesets = spark.read.format(Source.Changesets).options(options).load + changesets + .repartition(1) + .write + .orc(outputURI.toString) + + spark.stop() + } + } +) + +object ChangesetORCUpdaterUtils { + implicit val readInstant: Argument[Instant] = new Argument[Instant] { + override def read(string: String): ValidatedNel[String, Instant] = { + try { Validated.valid(Instant.parse(string)) } + catch { case e: Exception => Validated.invalidNel(s"Invalid time: $string (${ e.getMessage })") } + } + + override def defaultMetavar: String = "time" + } + + // implicit val TimestampFormat : Decoder[Timestamp] = new Decoder[Timestamp] { + // private val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss XXX") + + // override def apply(c: HCursor): Decoder.Result[Timestamp] = + // c.get[String]("last_run").map{ str => + // val splt = str.split(".") + // Timestamp.from(sdf.parse(splt(0) ++ splt(1).split(" ")(1)).toInstant) + // } + // } + + private val formatter = DateTimeFormat.forPattern("y-M-d H:m:s.SSSSSSSSS Z") + + private implicit val dateTimeDecoder: Decoder[DateTime] = + Decoder.instance(a => a.as[String].map(DateTime.parse(_, formatter))) + + case class Sequence(last_run: DateTime, sequence: Long) + + def getCurrentSequence(baseURI: URI): Sequence = { + val response = + Http(baseURI.resolve("state.yaml").toString).asString + + val state = yaml.parser + .parse(response.body) + .leftMap(err => err: Error) + .flatMap(_.as[Sequence]) + .valueOr(throw _) + + state + } + + def getSequence(baseURI: URI, sequence: Long): Sequence = { + val s = f"${sequence+1}%09d" + val path = s"${s.slice(0, 3)}/${s.slice(3, 6)}/${s.slice(6, 9)}.state.txt" + + val response = + Http(baseURI.resolve(path).toString).asString + + val state = yaml.parser + .parse(response.body) + .leftMap(err => err: Error) + .flatMap(_.as[Sequence]) + .valueOr(throw _) + + state + } + + def estimateSequenceNumber(modifiedTime: Instant, baseURI: URI): Long = { + val current = getCurrentSequence(baseURI) + val diffMinutes = (current.last_run.toInstant.getMillis/1000 - modifiedTime.getEpochSecond) / 60 + current.sequence - diffMinutes + } + + def findSequenceFor(modifiedTime: Instant, baseURI: URI): Long = { + var guess = estimateSequenceNumber(modifiedTime, baseURI) + val target = org.joda.time.Instant.parse(modifiedTime.toString) + + while (getSequence(baseURI, guess).last_run.isAfter(target)) { guess -= 1 } + while (getSequence(baseURI, guess).last_run.isBefore(target)) { guess += 1 } + + getSequence(baseURI, guess).sequence + } +} diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index e7d0af63..eae98d5f 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -111,17 +111,17 @@ object ChangesetStatsCreator 'tags.getItem("created_by") as 'editor, 'uid, 'user, - 'created_at, + 'createdAt, 'tags.getItem("comment") as 'comment, - 'tags.getItem("hashtags") as 'hashtag) - .agg(first('closed_at, ignoreNulls = true) as 'closed_at) + 'tags.getItem("hashtags") as 'hashtags) + .agg(first('closedAt, ignoreNulls = true) as 'closedAt) .select( 'id, 'editor, 'uid, 'user, - 'created_at as 'createdAt, - 'closed_at as 'closedAt, + 'createdAt, + 'closedAt, merge_sets(hashtags('comment), hashtags('hashtags)) as 'hashtags ) diff --git a/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala b/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala index 7a07552a..eae6af03 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala @@ -46,13 +46,24 @@ package object stats { isCoastline(tags) or isPOI(tags) as 'isInterestingWay - def isLinear(tags: Column): Column = isRoad(tags) or isWaterway(tags) or isCoastline(tags) as 'isLinear + def isRailSite(tags: Column): Column = + array_contains(splitDelimitedValues(tags.getItem("railway")), "station") or + array_contains(splitDelimitedValues(tags.getItem("railway")), "yard") or + array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailSite + + def isRailLine(tags: Column): Column = not(isRailSite(tags)) and tags.getItem("railway").isNotNull as 'isRailLine + + def isRailway(tags: Column): Column = + tags.getItem("railway").isNotNull or array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailway + + def isLinear(tags: Column): Column = isRoad(tags) or isWaterway(tags) or isCoastline(tags) or isRailLine(tags) as 'isLinear def isOther(tags: Column): Column = isTagged(tags) and not(isRoad((tags))) and not(isWaterway(tags)) and not(isCoastline(tags)) and not(isBuilding(tags)) and + not(isRailway(tags)) and not(isPOI(tags)) as 'isOther def DefaultMeasurements(implicit sparkSession: SparkSession): Column = { @@ -67,7 +78,10 @@ package object stats { lit("waterway_km_deleted"), (isWaterway('tags) and !'visible).cast(IntegerType) * 'delta / 1000, lit("coastline_km_added"), (isCoastline('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, lit("coastline_km_modified"), (isCoastline('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, - lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000 + lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000, + lit("railway_km_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, + lit("railway_km_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, + lit("railway_km_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType) * 'delta / 1000 )) as 'measurements } @@ -87,6 +101,9 @@ package object stats { lit("buildings_added"), (isBuilding('tags) and isNew('version, 'minorVersion)).cast(IntegerType), lit("buildings_modified"), (isBuilding('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), lit("buildings_deleted"), (isBuilding('tags) and !'visible).cast(IntegerType), + lit("railways_added"), (isRailway('tags) and isNew('version, 'minorVersion)).cast(IntegerType), + lit("railways_modified"), (isRailway('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), + lit("railways_deleted"), (isRailway('tags) and !'visible).cast(IntegerType), lit("pois_added"), (isPOI('tags) and isNew('version, 'minorVersion)).cast(IntegerType), lit("pois_modified"), (isPOI('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), lit("pois_deleted"), (isPOI('tags) and !'visible).cast(IntegerType), From 2132083be5a8848b4166f0586a473399af45e3fb Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 1 Aug 2019 10:37:06 -0400 Subject: [PATCH 02/24] Adjust definitions of rail features, disaggregate rail counts --- .../osmesa/analytics/stats/package.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala b/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala index eae6af03..2fa082ce 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala @@ -46,13 +46,16 @@ package object stats { isCoastline(tags) or isPOI(tags) as 'isInterestingWay - def isRailSite(tags: Column): Column = + // Does this feature represent a rail-related site or area (not track) + def isRailFeature(tags: Column): Column = array_contains(splitDelimitedValues(tags.getItem("railway")), "station") or array_contains(splitDelimitedValues(tags.getItem("railway")), "yard") or array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailSite - def isRailLine(tags: Column): Column = not(isRailSite(tags)) and tags.getItem("railway").isNotNull as 'isRailLine + // Does this feature represent a section of rail track + def isRailLine(tags: Column): Column = not(isRailFeature(tags)) and tags.getItem("railway").isNotNull as 'isRailLine + // Does this feature represent a rail-related entity def isRailway(tags: Column): Column = tags.getItem("railway").isNotNull or array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailway @@ -79,9 +82,9 @@ package object stats { lit("coastline_km_added"), (isCoastline('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, lit("coastline_km_modified"), (isCoastline('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000, - lit("railway_km_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, - lit("railway_km_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, - lit("railway_km_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType) * 'delta / 1000 + lit("railline_km_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, + lit("railline_km_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, + lit("railline_km_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType) * 'delta / 1000 )) as 'measurements } @@ -101,9 +104,12 @@ package object stats { lit("buildings_added"), (isBuilding('tags) and isNew('version, 'minorVersion)).cast(IntegerType), lit("buildings_modified"), (isBuilding('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), lit("buildings_deleted"), (isBuilding('tags) and !'visible).cast(IntegerType), - lit("railways_added"), (isRailway('tags) and isNew('version, 'minorVersion)).cast(IntegerType), - lit("railways_modified"), (isRailway('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), - lit("railways_deleted"), (isRailway('tags) and !'visible).cast(IntegerType), + lit("railway_features_added"), (isRailFeature('tags) and isNew('version, 'minorVersion)).cast(IntegerType), + lit("railway_features_modified"), (isRailFeature('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), + lit("railway_features_deleted"), (isRailFeature('tags) and !'visible).cast(IntegerType), + lit("raillines_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType), + lit("raillines_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), + lit("raillines_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType), lit("pois_added"), (isPOI('tags) and isNew('version, 'minorVersion)).cast(IntegerType), lit("pois_modified"), (isPOI('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), lit("pois_deleted"), (isPOI('tags) and !'visible).cast(IntegerType), From d3152eeab78327be7805c2af2da8be49d9073613 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 1 Aug 2019 16:39:04 -0400 Subject: [PATCH 03/24] Fix casting error with JSON --- .../analytics/stats/ChangesetStatsForeachWriter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala b/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala index 28f5a0d9..e263e60e 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala @@ -23,7 +23,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI, | ?::jsonb AS measurements, | ?::jsonb AS counts, | ? AS total_edits, - | ? AS augmented_diffs, + | ?::integer[] AS augmented_diffs, | current_timestamp AS updated_at |) |INSERT INTO changesets AS c ( @@ -41,7 +41,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI, | measurements = ( | SELECT jsonb_object_agg(key, value) | FROM ( - | SELECT key, sum(value::numeric) AS value + | SELECT key, sum((value->>0)::numeric) AS value | FROM ( | SELECT * from jsonb_each(c.measurements) | UNION ALL @@ -54,7 +54,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI, | counts = ( | SELECT jsonb_object_agg(key, value) | FROM ( - | SELECT key, sum(value::numeric) AS value + | SELECT key, sum((value->>0)::numeric) AS value | FROM ( | SELECT * from jsonb_each(c.counts) | UNION ALL From 3762f2364fd4d8826ed58e7359585ee499b04239 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 2 Aug 2019 11:22:52 -0400 Subject: [PATCH 04/24] Unify table schemas and union --- .../analytics/oneoffs/ChangesetORCCreator.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala index 60b68555..57f47d59 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala @@ -92,6 +92,22 @@ object ChangesetORCUpdater val changesets = spark.read.format(Source.Changesets).options(options).load changesets + .drop("comments", "sequence") + .union(df.select( + 'id, + 'tags, + 'created_at as 'createdAt, + 'open, + 'closed_at as 'closedAt, + 'comments_count as 'commentsCount, + 'min_lat as 'minLat, + 'max_lat as 'maxLat, + 'min_lon as 'minLon, + 'max_lon as 'maxLon, + 'num_changes as 'numChanges, + 'uid, + 'user) + ) .repartition(1) .write .orc(outputURI.toString) From e97e21f252d4bd15d407cc7c7e0a611ab3af3644 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 2 Aug 2019 11:24:30 -0400 Subject: [PATCH 05/24] Address minor PR review comments --- .../analytics/oneoffs/ChangesetORCCreator.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala index 57f47d59..c1e976f3 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala @@ -30,8 +30,8 @@ import scalaj.http.Http */ object ChangesetORCUpdater extends CommandApp( - name = "osmesa-changeset-stats-updater", - header = "Update statistics from augmented diffs", + name = "osmesa-changeset-orc-updater", + header = "Bring existing changesets ORC file up to date using changeset stream", main = { import ChangesetORCUpdaterUtils._ @@ -127,16 +127,6 @@ object ChangesetORCUpdaterUtils { override def defaultMetavar: String = "time" } - // implicit val TimestampFormat : Decoder[Timestamp] = new Decoder[Timestamp] { - // private val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss XXX") - - // override def apply(c: HCursor): Decoder.Result[Timestamp] = - // c.get[String]("last_run").map{ str => - // val splt = str.split(".") - // Timestamp.from(sdf.parse(splt(0) ++ splt(1).split(" ")(1)).toInstant) - // } - // } - private val formatter = DateTimeFormat.forPattern("y-M-d H:m:s.SSSSSSSSS Z") private implicit val dateTimeDecoder: Decoder[DateTime] = From 729259a676df0d97040d8d82bf447d2f98493a8e Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 2 Aug 2019 11:37:03 -0400 Subject: [PATCH 06/24] Address minor PR review comments --- .../scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala index c1e976f3..ccea6f8e 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala @@ -154,13 +154,11 @@ object ChangesetORCUpdaterUtils { val response = Http(baseURI.resolve(path).toString).asString - val state = yaml.parser + yaml.parser .parse(response.body) .leftMap(err => err: Error) .flatMap(_.as[Sequence]) .valueOr(throw _) - - state } def estimateSequenceNumber(modifiedTime: Instant, baseURI: URI): Long = { From 627f3e4256889fb258794bbc79bd98c8d92b38b1 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Tue, 6 Aug 2019 12:58:53 -0400 Subject: [PATCH 07/24] Write end position in augdiff and changeset streams to checkpoints table during bulk ingest --- .../oneoffs/ChangesetMetadataUpdater.scala | 3 +- .../oneoffs/ChangesetORCCreator.scala | 33 ++++++++++++++-- .../oneoffs/ChangesetStatsCreator.scala | 39 +++++++++++++++++-- .../oneoffs/ChangesetStatsUpdater.scala | 3 +- 4 files changed, 69 insertions(+), 9 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala index 827af724..619cc903 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala @@ -80,7 +80,8 @@ object ChangesetMetadataUpdater import ss.implicits._ val options = Map( - Source.BaseURI -> changesetSource.toString + Source.BaseURI -> changesetSource.toString, + Source.ProcessName -> "ChangesetStream" ) ++ startSequence .map(s => Map(Source.StartSequence -> s.toString)) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala index ccea6f8e..312caa8e 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala @@ -11,9 +11,10 @@ import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import osmesa.analytics.Analytics import vectorpipe.sources.{ChangesetSource, Source} +import vectorpipe.util.DBUtils import java.net.URI -import java.sql.Timestamp +import java.sql._ import java.time.Instant import scalaj.http.Http @@ -25,8 +26,10 @@ import scalaj.http.Http * spark-submit \ * --class osmesa.analytics.oneoffs.ChangesetORCUpdater \ * ingest/target/scala-2.11/osmesa-analytics.jar \ - * --changeset-source http://somewhere/diffs/ \ - * --database-url $DATABASE_URL + * --changesets http://location/of/changeset/replications \ + * --end-time 1970-01-01T13:00:00Z + * s3://path/to/history.orc + * s3://path/to/output.orc */ object ChangesetORCUpdater extends CommandApp( @@ -74,7 +77,7 @@ object ChangesetORCUpdater orcArg, outputArg).mapN { (changesetSource, endTime, orcUri, outputURI) => - implicit val spark: SparkSession = Analytics.sparkSession("ChangesetStatsUpdater") + implicit val spark: SparkSession = Analytics.sparkSession("ChangesetORCCreator") import spark.implicits._ @@ -176,4 +179,26 @@ object ChangesetORCUpdaterUtils { getSequence(baseURI, guess).sequence } + + def saveLocations(procName: String, sequence: Int, databaseURI: URI) = { + var connection = null.asInstanceOf[Connection] + try { + connection = DBUtils.getJdbcConnection(databaseURI) + val upsertSequence = + connection.prepareStatement( + """ + |INSERT INTO checkpoints (proc_name, sequence) + |VALUES (?, ?) + |ON CONFLICT (proc_name) + |DO UPDATE SET sequence = ? + """.stripMargin + ) + upsertSequence.setString(1, procName) + upsertSequence.setInt(2, sequence) + upsertSequence.setInt(3, sequence) + upsertSequence.execute() + } finally { + if (connection != null) connection.close() + } + } } diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index eae98d5f..7ef56ac2 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -1,6 +1,7 @@ package osmesa.analytics.oneoffs import java.net.URI +import java.sql._ import cats.implicits._ import com.monovore.decline.{CommandApp, Opts} @@ -13,7 +14,7 @@ import osmesa.analytics.stats.functions._ import vectorpipe.{internal => ProcessOSM} import vectorpipe.functions._ import vectorpipe.functions.osm._ -import vectorpipe.util.Geocode +import vectorpipe.util.{DBUtils, Geocode} object ChangesetStatsCreator extends CommandApp( @@ -27,6 +28,16 @@ object ChangesetStatsCreator Opts .option[String]("changesets", help = "Location of the Changesets ORC file to process.") + val changesetBaseOpt = + Opts + .option[URI]( + "changeset-stream", + short = "c", + metavar = "uri", + help = "HTTP Location of replication changesets" + ) + .validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") } + val databaseUrlOpt = Opts .option[URI]( @@ -37,12 +48,20 @@ object ChangesetStatsCreator ) .orElse(Opts.env[URI]("DATABASE_URL", help = "The URL of the database")) - (historyOpt, changesetsOpt, databaseUrlOpt).mapN { - (historySource, changesetSource, databaseUrl) => + (historyOpt, changesetsOpt, changesetBaseOpt, databaseUrlOpt).mapN { + (historySource, changesetSource, changesetBaseURI, databaseUrl) => implicit val spark: SparkSession = Analytics.sparkSession("ChangesetStats") import spark.implicits._ + val logger = org.apache.log4j.Logger.getLogger(getClass()) + val history = spark.read.orc(historySource) + + val augdiffEndSequence = { + val t = history.select(max('timestamp)).first.getAs[java.sql.Timestamp](0).toInstant + ((t.getEpochSecond - 1347432900) / 60).toInt + } + val nodes = ProcessOSM.preprocessNodes(history) val ways = ProcessOSM.preprocessWays(history) @@ -105,6 +124,10 @@ object ChangesetStatsCreator ) val changesets = spark.read.orc(changesetSource) + val changesetsEndSequence = { + val t = changesets.select(max(coalesce('createdAt, 'closedAt))).first.getAs[java.sql.Timestamp](0) + ChangesetORCUpdaterUtils.findSequenceFor(t.toInstant, changesetBaseURI).toInt + } val changesetMetadata = changesets .groupBy('id, @@ -155,6 +178,16 @@ object ChangesetStatsCreator } }) + // Distributing these writes to the executors to avoid no suitable driver errors on master node + logger.warn(s"Writing AugmentedDiffStream sequence number as $augdiffEndSequence to $databaseUrl") + spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => + ChangesetORCUpdaterUtils.saveLocations("AugmentedDiffStream", augdiffEndSequence, uri) + } + logger.warn(s"Writing ChangesetStream sequence number as $changesetsEndSequence to $databaseUrl") + spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => + ChangesetORCUpdaterUtils.saveLocations("ChangesetStream", changesetsEndSequence, uri) + } + spark.stop() } } diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala index 81d2b5d6..6d48a9de 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala @@ -91,7 +91,8 @@ object ChangesetStatsUpdater import ss.implicits._ val options = Map( - Source.BaseURI -> augmentedDiffSource.toString + Source.BaseURI -> augmentedDiffSource.toString, + Source.ProcessName -> "AugmentedDiffStream" ) ++ startSequence .map(s => Map(Source.StartSequence -> s.toString)) From ed5e6c35e834c5972e23c8a07092b76d4541380b Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 16 Aug 2019 16:17:19 -0400 Subject: [PATCH 08/24] Introduce new Makefile for streaming deployment --- deployment/build-container.sh | 12 +- deployment/streaming/.gitignore | 6 +- deployment/streaming/Makefile | 494 ++++++++++++++---- deployment/streaming/config-aws.mk.example | 25 - .../streaming/config-deployment.mk.template | 39 ++ deployment/streaming/config-local.mk.example | 7 - .../streaming/docker-compose.deploy.yml.tpl | 42 -- .../streaming/docker-compose.local.yml.tpl | 51 -- deployment/streaming/docker-compose.yml | 47 -- deployment/streaming/get-tag.sh | 17 + 10 files changed, 470 insertions(+), 270 deletions(-) delete mode 100644 deployment/streaming/config-aws.mk.example create mode 100644 deployment/streaming/config-deployment.mk.template delete mode 100644 deployment/streaming/config-local.mk.example delete mode 100644 deployment/streaming/docker-compose.deploy.yml.tpl delete mode 100644 deployment/streaming/docker-compose.local.yml.tpl delete mode 100644 deployment/streaming/docker-compose.yml create mode 100755 deployment/streaming/get-tag.sh diff --git a/deployment/build-container.sh b/deployment/build-container.sh index 379c47b2..3ff8a0d3 100755 --- a/deployment/build-container.sh +++ b/deployment/build-container.sh @@ -1,5 +1,14 @@ #!/bin/bash +if [ -z ${VERSION_TAG+x} ]; then + echo "No version tag has been set. Do not run this script directly; rather issue" + echo " make build-container" + echo "from the 'streaming' directory." + exit 1 +else + echo "Version tag is set to '${VERSION_TAG}'" +fi + set -xe SBT_DIR="../src" JAR_DIR=${SBT_DIR}/analytics/target/scala-2.11/ @@ -10,4 +19,5 @@ cd ${SBT_DIR} cp ${JAR_DIR}/osmesa-analytics.jar ${DOCKER_DIR}/osmesa-analytics.jar cd ${DOCKER_DIR} -docker build -f ${DOCKER_DIR}/Dockerfile --tag osm_analytics:latest ${DOCKER_DIR} + +docker build -f ${DOCKER_DIR}/Dockerfile --tag osm_analytics:${VERSION_TAG} ${DOCKER_DIR} diff --git a/deployment/streaming/.gitignore b/deployment/streaming/.gitignore index 8019c7be..32e2785d 100644 --- a/deployment/streaming/.gitignore +++ b/deployment/streaming/.gitignore @@ -1,5 +1 @@ -repository -docker-compose.local.yml -docker-compose.deploy.yml -config-local.mk -config-aws.mk +config-*.mk diff --git a/deployment/streaming/Makefile b/deployment/streaming/Makefile index a3483dda..07801c74 100644 --- a/deployment/streaming/Makefile +++ b/deployment/streaming/Makefile @@ -1,104 +1,414 @@ -include config-aws.mk # Variables for AWS options -include config-local.mk # Variables related to running locally +include config-deployment.mk -# The osmesa container -LOCAL_IMG := osm_analytics:latest +# If the user is on master branch, see if we should deploy to production +VERSION_TAG=$(shell ./get-tag.sh) +ifeq ($(VERSION_TAG), production) + DATABASE=${PRODUCTION_DB} + ECS_CLUSTER=osm-stat-stream-cluster + TASK_SUFFIX= +else + DATABASE=${STAGING_DB} + ECS_CLUSTER=osm-stats-staging + TASK_SUFFIX="-staging" +endif +DB_URI=${DB_BASE_URI}${DATABASE} +############################# +# Docker image management # +############################# -######### -# LOCAL # -######### -docker-compose.local.yml: - export LOCAL_IMG=${LOCAL_IMG}; \ - export AUGDIFF_SOURCE=${LOCAL_AUGDIFF_SOURCE}; \ - export CHANGESET_SOURCE=${LOCAL_CHANGESET_SOURCE}; \ - export CHANGE_SOURCE=${LOCAL_CHANGE_SOURCE}; \ - export AUGDIFF_START=${LOCAL_AUGDIFF_START}; \ - export CHANGESET_START=${LOCAL_CHANGESET_START}; \ - export CHANGE_START=${LOCAL_CHANGE_START}; \ - ./expand.sh docker-compose.local.yml.tpl > docker-compose.local.yml +.PHONY: build-container login-aws-registry tag-image push-image -start-local: docker-compose.local.yml - docker-compose -f docker-compose.local.yml up - -stop-local: - docker-compose -f docker-compose.local.yml down - - -######### -# AWS # -######### +build-container: + cd .. && VERSION_TAG=${VERSION_TAG} ./build-container.sh login-aws-registry: eval `aws ecr get-login --no-include-email --region ${AWS_REGION}` -tag-image: - docker tag ${LOCAL_IMG} ${ECR_REPO} +tag-image: build-container + docker tag osm_analytics:${VERSION_TAG} ${ECR_IMAGE}:${VERSION_TAG} push-image: login-aws-registry tag-image - docker push ${ECR_REPO} - -.PHONY: docker-compose.deploy.yml - -docker-compose.deploy.yml: docker-compose.deploy.yml.tpl - export ECR_REPO=${ECR_REPO} - export AWS_LOG_GROUP=${AWS_LOG_GROUP}; \ - export AWS_REGION=${AWS_REGION}; \ - export AUGDIFF_SOURCE=${AUGDIFF_SOURCE}; \ - export AUGDIFF_START=${AUGDIFF_START}; \ - export CHANGESET_SOURCE=${CHANGESET_SOURCE}; \ - export CHANGESET_START=${CHANGESET_START}; \ - export DB_URI=${DB_URI}; \ - ./expand.sh $< > $@ - -.PHONY: configure-cluster - -configure-cluster: - ecs-cli configure \ - --cluster ${CLUSTER_NAME} \ - --region ${AWS_REGION} \ - --config-name ${CONFIG_NAME} - -cluster-up: - ecs-cli up \ - --keypair ${KEYPAIR} \ - --instance-role ${INSTANCE_ROLE} \ - --security-group ${SECURITY_GROUP} \ - --size 1 \ - --instance-type ${INSTANCE_TYPE} \ - --cluster-config ${CONFIG_NAME} \ - --subnets ${SUBNETS} \ - --vpc ${VPC} \ - --force \ - --verbose - -cluster-down: - ecs-cli down --cluster-config ${CONFIG_NAME} - -.PHONY: create-service - -create-service: docker-compose.deploy.yml configure-cluster - ecs-cli compose \ - --file $< create \ - --cluster ${CLUSTER_NAME} - -start-service: docker-compose.deploy.yml configure-cluster create-service - ecs-cli compose --file $< service up \ - --deployment-min-healthy-percent 0 \ - --create-log-groups \ - --cluster ${CLUSTER_NAME} - -stop-service: docker-compose.deploy.yml - ecs-cli compose --file $< down - - -######### -# ALL # -######### -build-container: - cd .. && ./build-container.sh + docker push ${ECR_IMAGE}${VERSION_TAG} + +####################### +# Streaming AWS Tasks # +####################### + +.PHONY: define-tasks configure-cluster + +create-log-groups: + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP} + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP}-staging + aws logs create-log-group \ + --log-group-name /ecs/streaming-user-footprint-tile-updater + aws logs create-log-group \ + --log-group-name /ecs/streaming-edit-histogram-tile-updater + +define-streaming-vectortile-tasks: + aws ecs register-task-definition \ + --family osmesa-streaming-edit-histogram-tile-updater \ + --task-role-arn "arn:aws:iam:::role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam:::role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "2 GB" \ + --container-definitions '[ + { + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/streaming-edit-histogram-tile-updater", + "awslogs-region": "${AWS_REGION}", + "awslogs-stream-prefix": "ecs" + } + }, + "command": [ + "/spark/bin/spark-submit", + "--driver-memory", "2048m", + "--class", "osmesa.analytics.oneoffs.StreamingFacetedEditHistogramTileUpdater", + "/opt/osmesa-analytics.jar", + "--augmented-diff-source", "${AUGDIFF_SOURCE}", + "--tile-source", "${HISTOGRAM_VT_LOCATION}" + ], + "environment": [ + { + "name": "DATABASE_URL", + "value": "${DB_BASE_URI}${PRODUCTION_DB}" + } + ], + "image": "${ECR_IMAGE}", + "name": "streaming-edit-histogram-tile-updater" + }, + { + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/streaming-user-footprint-tile-updater", + "awslogs-region": "${AWS_REGION}", + "awslogs-stream-prefix": "ecs" + } + }, + "command": [ + "/spark/bin/spark-submit", + "--driver-memory", "2048m", + "--class", "osmesa.analytics.oneoffs.StreamingUserFootprintTileUpdater", + "/opt/osmesa-analytics.jar", + "--change-source", "${CHANGESET_SOURCE}", + "--tile-source", "${FOOTPRINT_VT_LOCATION}" + ], + "environment": [ + { + "name": "DATABASE_URL", + "value": "${DB_BASE_URI}${PRODUCTION_DB}" + } + ], + "image": "${ECR_IMAGE}:production", + "name": "streaming-user-footprint-tile-updater" + } + ]' + +define-staging-streaming-update-tasks: + aws ecs register-task-definition \ + --family osmesa-streaming-stats-updater \ + --task-role-arn "arn:aws:iam:::role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam:::role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions '[ + { + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/${AWS_LOG_GROUP}-staging", + "awslogs-region": "${AWS_REGION}", + "awslogs-stream-prefix": "ecs" + } + }, + "command": [ + "/spark/bin/spark-submit", + "--driver-memory", "2048m", + "--class", "osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor", + "/opt/osmesa-analytics.jar", + "--augmented-diff-source", "${AUGDIFF_SOURCE}" + ], + "environment": [ + { + "name": "DATABASE_URL", + "value": "${DB_BASE_URI}${STAGING_DB}" + } + ], + "image": "${ECR_IMAGE}:latest", + "name": "streaming-augmented-diffs-stats-updater-staging" + }, + { + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/${AWS_LOG_GROUP}-staging", + "awslogs-region": "${AWS_REGION}", + "awslogs-stream-prefix": "ecs" + } + }, + "command": [ + "/spark/bin/spark-submit", + "--driver-memory", "2048m", + "--class", "osmesa.analytics.oneoffs.ChangesetStreamProcessor", + "/opt/osmesa-analytics.jar", + "--changeset-source", "${CHANGESET_SOURCE}" + ], + "environment": [ + { + "name": "DATABASE_URL", + "value": "${DB_URI}${STAGING_DB}" + } + ], + "image": "${ECR_IMAGE}:latest", + "name": "streaming-changesets-stats-updater-staging" + } + ]' + +define-production-streaming-update-tasks: + aws ecs register-task-definition \ + --family osmesa-streaming-stats-updater \ + --task-role-arn "arn:aws:iam:::role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam:::role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions '[ + { + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/${AWS_LOG_GROUP}", + "awslogs-region": "${AWS_REGION}", + "awslogs-stream-prefix": "ecs" + } + }, + "command": [ + "/spark/bin/spark-submit", + "--driver-memory", "2048m", + "--class", "osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor", + "/opt/osmesa-analytics.jar", + "--augmented-diff-source", "${AUGDIFF_SOURCE}" + ], + "environment": [ + { + "name": "DATABASE_URL", + "value": "${DB_URI}${PRODUCTION_DB}" + } + ], + "image": "${ECR_IMAGE}:production", + "name": "streaming-augmented-diffs-stats-updater" + }, + { + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/${AWS_LOG_GROUP}", + "awslogs-region": "${AWS_REGION}", + "awslogs-stream-prefix": "ecs" + } + }, + "command": [ + "/spark/bin/spark-submit", + "--driver-memory", "2048m", + "--class", "osmesa.analytics.oneoffs.ChangesetStreamProcessor", + "/opt/osmesa-analytics.jar", + "--changeset-source", "${CHANGESET_SOURCE}" + ], + "environment": [ + { + "name": "DATABASE_URL", + "value": "${DB_URI}${PRODUCTION_DB}" + } + ], + "image": "${ECR_IMAGE}:production", + "name": "streaming-changesets-stats-updater" + } + ]' + +deploy-streaming-footprint-updater: + aws ecs create-service \ + --cluster "osm-stat-stream-cluster" \ + --service-name "streaming-user-footprint-tile-updater" \ + --task-definition "streaming-edit-histogram-tile-updater" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration '{ + "awsvpcConfiguration": { + "subnets": ["${ECS_SUBNET}"], + "securityGroups": ["${ECS_SECURITY_GROUP}"], + "assignPublicIp": "DISABLED" + } + }' + +deploy-streaming-edit-histogram-updater: + aws ecs create-service \ + --cluster "osm-stat-stream-cluster" \ + --service-name "streaming-edit-histogram-tile-updater" \ + --task-definition "streaming-edit-histogram-tile-updater" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration '{ + "awsvpcConfiguration": { + "subnets": ["${ECS_SUBNET}"], + "securityGroups": ["${ECS_SECURITY_GROUP}"], + "assignPublicIp": "DISABLED" + } + }' + +deploy-streaming-stats-updaters: + aws ecs create-service \ + --cluster "${ECS_CLUSTER}" \ + --service-name "streaming-augdiffs-stats-updater" \ + --task-definition "streaming-augmented-diffs-stats-updater${TASK_SUFFIX}" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration '{ + "awsvpcConfiguration": { + "subnets": ["${ECS_SUBNET}"], + "securityGroups": ["${ECS_SECURITY_GROUP}"], + "assignPublicIp": "DISABLED" + } + }' + aws ecs create-service \ + --cluster "${ECS_CLUSTER}" \ + --service-name "streaming-changesets-stats-updater" \ + --task-definition "streaming-changesets-stats-updater${TASK_SUFFIX}" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration '{ + "awsvpcConfiguration": { + "subnets": ["${ECS_SUBNET}"], + "securityGroups": ["${ECS_SECURITY_GROUP}"], + "assignPublicIp": "DISABLED" + } + }' + +deploy-all-streaming-tasks: deploy-footprint-updater deploy-streaming-edit-histogram-updater deploy-streaming-stats-updaters + +################### +# Batch AWS Tasks # +################### -clean: - rm -f docker-compose.local.yml - rm -f docker-compose.deploy.yml +batch-generate-footprints: + aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --ebs-root-volume-size 10 \ + --ec2-attributes '{ + "KeyName": "${KEYPAIR}", + "InstanceProfile":"EMR_EC2_DefaultRole", + "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", + "SubnetId": "${SUBNET}", + "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", + "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" + }' \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name 'User footprint tile generation' \ + --instance-groups '[ + { + "InstanceCount": 1, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "MASTER", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Master" + }, { + "InstanceCount": 20, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "CORE", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Workers" + } + ]' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region ${AWS_REGION} \ + --steps '[ + { + "Args": [ + "spark-submit", + "--deploy-mode", "cluster", + "--class", "osmesa.analytics.oneoffs.FootprintCommand", + "--conf", "spark.executor.memoryOverhead=2g", + "--conf", "spark.sql.shuffle.partitions=2000", + "--conf", "spark.speculation=true", + "${OSMESA_ANALYTICS_JAR}", + "--history", "${NOME_HISTORY_ORC}", + "--changesets", "${NOME_CHANGESETS_ORC", + "--out", "${FOOTPRINT_VT_LOCATION}", + "--type", "users", + ], + "Type": "CUSTOM_JAR", + "ActionOnFailure": "TERMINATE_CLUSTER", + "Jar": "command-runner.jar", + "Properties": "", + "Name": "FootprintCommand" + } + ]' +batch-generate-edit-histograms: + aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --ebs-root-volume-size 10 \ + --ec2-attributes '{ + "KeyName": "${KEYPAIR}", + "InstanceProfile":"EMR_EC2_DefaultRole", + "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", + "SubnetId": "${SUBNET}", + "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", + "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" + }' \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name 'Faceted State of the Data tile generation' \ + --instance-groups '[ + { + "InstanceCount": 1, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "MASTER", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Master" + }, { + "InstanceCount": 20, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "CORE", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Workers" + } + ]' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region us-east-1 \ + --steps '[ + { + "Args": [ + "spark-submit", + "--deploy-mode", "cluster", + "--class", "osmesa.analytics.oneoffs.FacetedEditHistogramTileCreator", + "--conf", "spark.executor.memoryOverhead=2g", + "--conf", "spark.sql.shuffle.partitions=2000", + "--conf", "spark.speculation=true", + "${OSMESA_ANALYTICS_JAR}", + "--history", "${NOME_HISTORY_ORC}", + "--out", "${HISTOGRAM_VT_LOCATION}" + ], + "Type": "CUSTOM_JAR", + "ActionOnFailure": "TERMINATE_CLUSTER", + "Jar": "command-runner.jar", + "Properties": "", + "Name": "FacetedEditHistogramTileCreator" + } + ]' diff --git a/deployment/streaming/config-aws.mk.example b/deployment/streaming/config-aws.mk.example deleted file mode 100644 index 03e55711..00000000 --- a/deployment/streaming/config-aws.mk.example +++ /dev/null @@ -1,25 +0,0 @@ -export CONFIG_NAME := osm-stat-stream-config - -# AWS properties -export CLUSTER_NAME := osm-stat-stream-cluster -export INSTANCE_TYPE := m4.xlarge -export KEYPAIR := [AWS key pair] -export VPC := [VPC ID] -export SUBNETS := [comma-delimited list of subnets within the above VPC] -export SECURITY_GROUP := [comma-delimited list of AWS Security Group IDs] -export ECR_REPO := [AWS ECR repo URI] -export AWS_LOG_GROUP := osm-stats-stream -export AWS_REGION := us-east-1 -export INSTANCE_ROLE := [IAM instance role] - -export AUGDIFF_SOURCE := s3://path/to/augdiffs/ -export CHANGESET_SOURCE := https://planet.osm.org/replication/changesets/ -export AUGDIFF_START := [Start of Augdiff stream (for stats)] -export CHANGE_START := [Start of change stream (for user footprints)] -export CHANGESET_START := [Start of changeset stream (for stats)] -export OVERPASS_URL := [alternative Overpass URL] -export TILE_SOURCE := s3://path/to/user/footprints/ - - -export DB_URI := [URI to DB for writing outputs from stream] - diff --git a/deployment/streaming/config-deployment.mk.template b/deployment/streaming/config-deployment.mk.template new file mode 100644 index 00000000..e6fad1c2 --- /dev/null +++ b/deployment/streaming/config-deployment.mk.template @@ -0,0 +1,39 @@ +################################################################################ +# AWS properties +################################################################################ +export KEYPAIR := +export SUBNET := +export AWS_REGION := us-east-1 + +################################################################################ +# Streaming resource definitions +################################################################################ +export CLUSTER_NAME := osm-stat-stream-cluster +export STREAMING_INSTANCE_TYPE := m4.xlarge +export ECR_IMAGE := +export AWS_LOG_GROUP := streaming-stats-updater +export ECS_SUBNET := ${SUBNET} +export ECS_SECURITY_GROUP := + +export AUGDIFF_SOURCE := +export CHANGESET_SOURCE := + +export DB_BASE_URI := +export PRODUCTION_DB := +export STAGING_DB := + +################################################################################ +# Batch resource definitions +################################################################################ +export SERVICE_ACCESS_SECURITY_GROUP := ${ECS_SECURITY_GROUP} +export EMR_MASTER_SECURITY_GROUP := +export EMR_SLAVE_SECURITY_GROUP := + +export BATCH_INSTANCE_TYPE := m4.xlarge +export OSMESA_ANALYTICS_JAR := s3:///osmesa-analytics.jar + +export NOME_HISTORY_ORC := +export NOME_CHANGESETS_ORC := + +export FOOTPRINT_VT_LOCATION := +export HISTOGRAM_VT_LOCATION := diff --git a/deployment/streaming/config-local.mk.example b/deployment/streaming/config-local.mk.example deleted file mode 100644 index 02a525e9..00000000 --- a/deployment/streaming/config-local.mk.example +++ /dev/null @@ -1,7 +0,0 @@ -export LOCAL_AUGDIFF_SOURCE := s3://path/to/augdiffs/ -export LOCAL_CHANGE_SOURCE := https://planet.osm.org/replication/minute/ -export LOCAL_CHANGESET_SOURCE := https://planet.osm.org/replication/changesets/ -export LOCAL_AUGDIFF_START := [Start of augdiff stream] -export LOCAL_CHANGE_START := [Start of change stream] # see https://planet.osm.org/replication/minute/state.txt -export LOCAL_CHANGESET_START := [Start of changeset stream] # see https://planet.osm.org/replication/changesets/state.yaml - diff --git a/deployment/streaming/docker-compose.deploy.yml.tpl b/deployment/streaming/docker-compose.deploy.yml.tpl deleted file mode 100644 index 2cb6e6ad..00000000 --- a/deployment/streaming/docker-compose.deploy.yml.tpl +++ /dev/null @@ -1,42 +0,0 @@ -version: '3.0' -services: - augdiff-stream: - image: ${ECR_REPO}:latest - command: > - /spark/bin/spark-submit --driver-memory 2048m --class osmesa.analytics.oneoffs.StreamingChangesetStatsUpdater /opt/osmesa-analytics.jar - --augmented-diff-source ${AUGDIFF_SOURCE} - --start-sequence ${AUGDIFF_START} - --database-uri ${DB_URI} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: augdiff - changeset-stream: - image: ${ECR_REPO}:latest - command: > - /spark/bin/spark-submit --driver-memory 2048m --class osmesa.analytics.oneoffs.StreamingChangesetMetadataUpdater /opt/osmesa-analytics.jar - --changeset-source ${CHANGESET_SOURCE} - --start-sequence ${CHANGESET_START} - --database-uri ${DB_URI} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: changeset - user-footprint-updater: - image: ${ECR_REPO}:latest - command: > - /spark/bin/spark-submit --driver-memory 4096m --class osmesa.analytics.oneoffs.StreamingUserFootprintTileUpdater /opt/osmesa-analytics.jar - --change-source ${CHANGE_SOURCE} - --changes-start-sequence ${CHANGE_START} - --database-uri ${DB_URI} - --tile-source ${TILE_SOURCE} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: user-footprints diff --git a/deployment/streaming/docker-compose.local.yml.tpl b/deployment/streaming/docker-compose.local.yml.tpl deleted file mode 100644 index 0ea66c72..00000000 --- a/deployment/streaming/docker-compose.local.yml.tpl +++ /dev/null @@ -1,51 +0,0 @@ -version: '3.0' -services: - db: - image: postgres:10.5 - volumes: - - ../sql:/docker-entrypoint-initdb.d - environment: - - POSTGRES_PASSWORD=pgsecret - networks: - db: - aliases: - - database - changeset-stream: - image: ${LOCAL_IMG} - volumes: - - ./log4j.properties:/spark/conf/log4j.properties - command: > - /spark/bin/spark-submit --class osmesa.analytics.oneoffs.ChangesetStreamProcessor /opt/osmesa-analytics.jar - --changeset-source ${CHANGESET_SOURCE} - --start-sequence ${CHANGESET_START} - --database-uri postgresql://postgres:pgsecret@database:5432/postgres - networks: - - db - augdiff-stream: - image: ${LOCAL_IMG} - volumes: - - ~/.aws:/root/.aws - - ./log4j.properties:/spark/conf/log4j.properties - environment: - - AWS_PROFILE - command: > - /spark/bin/spark-submit --class osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor /opt/osmesa-analytics.jar - --augmented-diff-source ${AUGDIFF_SOURCE} - --start-sequence ${AUGDIFF_START} - --database-uri postgresql://postgres:pgsecret@database:5432/postgres - networks: - - db - change-stream: - image: ${LOCAL_IMG} - volumes: - - ./log4j.properties:/spark/conf/log4j.properties - command: > - /spark/bin/spark-submit --class osmesa.analytics.oneoffs.ChangeStreamProcessor /opt/osmesa-analytics.jar - --change-source ${CHANGE_SOURCE} - --start-sequence ${CHANGE_START} - --database-uri postgresql://postgres:pgsecret@database:5432/postgres - networks: - - db -networks: - db: - diff --git a/deployment/streaming/docker-compose.yml b/deployment/streaming/docker-compose.yml deleted file mode 100644 index dbf0a149..00000000 --- a/deployment/streaming/docker-compose.yml +++ /dev/null @@ -1,47 +0,0 @@ -version: '3.7' -services: - db: - image: postgres:10.5 - volumes: - - ../sql:/docker-entrypoint-initdb.d - environment: - - POSTGRES_PASSWORD=streamtest - networks: - db: - aliases: - - database - augdiff-stream: - image: osm_analytics:latest - command: > - /spark/bin/spark-submit - --class osmesa.analytics.oneoffs.StreamingChangesetStatsUpdater - /opt/osmesa-analytics.jar - --augmented-diff-source ${AUGDIFF_SOURCE} - --database-url postgresql://postgres:streamtest@database/postgres - --start-sequence 1 - deploy: - restart_policy: - condition: on-failure - delay: 1s - max_attempts: 10 - window: 120s - networks: - - db - changeset-stream: - image: osm_analytics:latest - command: > - /spark/bin/spark-submit - --class osmesa.analytics.oneoffs.StreamingChangesetMetadataUpdater - /opt/osmesa-analytics.jar - --start-sequence 1 - --database-url postgresql://postgres:streamtest@database/postgres - deploy: - restart_policy: - condition: on-failure - delay: 1s - max_attempts: 10 - window: 120s - networks: - - db -networks: - db: diff --git a/deployment/streaming/get-tag.sh b/deployment/streaming/get-tag.sh new file mode 100755 index 00000000..388bc0dd --- /dev/null +++ b/deployment/streaming/get-tag.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ "$(git branch | grep '* master')" = "* master" ]; then + while true; do + echo "You are on the master branch. Do you wish to publish to the production tag?" + select yn in "Yes" "No"; do + case $yn in + Yes ) VERSION_TAG="production"; break;; + No ) VERSION_TAG="latest"; break;; + esac + done + done +else + VERSION_TAG="latest" +fi + +echo "${VERSION_TAG}" From 6fd09669be291790f01467c6b0ffcbe9d43d27b7 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Mon, 19 Aug 2019 11:39:33 -0400 Subject: [PATCH 09/24] Clean up and improve streaming deployment scripts --- deployment/build-container.sh | 2 +- deployment/streaming/Makefile | 214 ++---------------- .../streaming/config-deployment.mk.template | 1 + .../streaming/scripts/create-log-groups.sh | 28 +++ ...efine-production-streaming-update-tasks.sh | 67 ++++++ .../define-staging-streaming-update-tasks.sh | 67 ++++++ .../define-streaming-vectortile-tasks.sh | 69 ++++++ deployment/streaming/{ => scripts}/expand.sh | 0 deployment/streaming/{ => scripts}/get-tag.sh | 0 9 files changed, 249 insertions(+), 199 deletions(-) create mode 100755 deployment/streaming/scripts/create-log-groups.sh create mode 100755 deployment/streaming/scripts/define-production-streaming-update-tasks.sh create mode 100755 deployment/streaming/scripts/define-staging-streaming-update-tasks.sh create mode 100644 deployment/streaming/scripts/define-streaming-vectortile-tasks.sh rename deployment/streaming/{ => scripts}/expand.sh (100%) rename deployment/streaming/{ => scripts}/get-tag.sh (100%) diff --git a/deployment/build-container.sh b/deployment/build-container.sh index 3ff8a0d3..aa0417c3 100755 --- a/deployment/build-container.sh +++ b/deployment/build-container.sh @@ -1,7 +1,7 @@ #!/bin/bash if [ -z ${VERSION_TAG+x} ]; then - echo "No version tag has been set. Do not run this script directly; rather issue" + echo "No version tag has been set. Do not run this script directly; instead, issue" echo " make build-container" echo "from the 'streaming' directory." exit 1 diff --git a/deployment/streaming/Makefile b/deployment/streaming/Makefile index 07801c74..84866847 100644 --- a/deployment/streaming/Makefile +++ b/deployment/streaming/Makefile @@ -1,18 +1,20 @@ include config-deployment.mk # If the user is on master branch, see if we should deploy to production -VERSION_TAG=$(shell ./get-tag.sh) +VERSION_TAG=$(shell ./scripts/get-tag.sh) ifeq ($(VERSION_TAG), production) DATABASE=${PRODUCTION_DB} - ECS_CLUSTER=osm-stat-stream-cluster + ECS_CLUSTER=${CLUSTER_NAME_DEPLOYMENT} TASK_SUFFIX= else DATABASE=${STAGING_DB} - ECS_CLUSTER=osm-stats-staging + ECS_CLUSTER=${CLUSTER_NAME_STAGING} TASK_SUFFIX="-staging" endif DB_URI=${DB_BASE_URI}${DATABASE} +.EXPORT_ALL_VARIABLES: + ############################# # Docker image management # ############################# @@ -29,215 +31,29 @@ tag-image: build-container docker tag osm_analytics:${VERSION_TAG} ${ECR_IMAGE}:${VERSION_TAG} push-image: login-aws-registry tag-image - docker push ${ECR_IMAGE}${VERSION_TAG} + docker push ${ECR_IMAGE}:${VERSION_TAG} ####################### # Streaming AWS Tasks # ####################### -.PHONY: define-tasks configure-cluster +.PHONY: create-log-groups define-streaming-vectortile-tasks define-staging-streaming-update-tasks define-production-streaming-update-tasks deploy-streaming-footprint-updater deploy-streaming-edit-histogram-updater deploy-streaming-stats-updaters create-log-groups: - aws logs create-log-group \ - --log-group-name /ecs/${AWS_LOG_GROUP} - aws logs create-log-group \ - --log-group-name /ecs/${AWS_LOG_GROUP}-staging - aws logs create-log-group \ - --log-group-name /ecs/streaming-user-footprint-tile-updater - aws logs create-log-group \ - --log-group-name /ecs/streaming-edit-histogram-tile-updater + ./scripts/create-log-groups.sh define-streaming-vectortile-tasks: - aws ecs register-task-definition \ - --family osmesa-streaming-edit-histogram-tile-updater \ - --task-role-arn "arn:aws:iam:::role/ECSTaskS3" \ - --execution-role-arn "arn:aws:iam:::role/ecsTaskExecutionRole" \ - --network-mode awsvpc \ - --requires-compatibilities EC2 FARGATE \ - --cpu "1 vCPU" \ - --memory "2 GB" \ - --container-definitions '[ - { - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-group": "/ecs/streaming-edit-histogram-tile-updater", - "awslogs-region": "${AWS_REGION}", - "awslogs-stream-prefix": "ecs" - } - }, - "command": [ - "/spark/bin/spark-submit", - "--driver-memory", "2048m", - "--class", "osmesa.analytics.oneoffs.StreamingFacetedEditHistogramTileUpdater", - "/opt/osmesa-analytics.jar", - "--augmented-diff-source", "${AUGDIFF_SOURCE}", - "--tile-source", "${HISTOGRAM_VT_LOCATION}" - ], - "environment": [ - { - "name": "DATABASE_URL", - "value": "${DB_BASE_URI}${PRODUCTION_DB}" - } - ], - "image": "${ECR_IMAGE}", - "name": "streaming-edit-histogram-tile-updater" - }, - { - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-group": "/ecs/streaming-user-footprint-tile-updater", - "awslogs-region": "${AWS_REGION}", - "awslogs-stream-prefix": "ecs" - } - }, - "command": [ - "/spark/bin/spark-submit", - "--driver-memory", "2048m", - "--class", "osmesa.analytics.oneoffs.StreamingUserFootprintTileUpdater", - "/opt/osmesa-analytics.jar", - "--change-source", "${CHANGESET_SOURCE}", - "--tile-source", "${FOOTPRINT_VT_LOCATION}" - ], - "environment": [ - { - "name": "DATABASE_URL", - "value": "${DB_BASE_URI}${PRODUCTION_DB}" - } - ], - "image": "${ECR_IMAGE}:production", - "name": "streaming-user-footprint-tile-updater" - } - ]' + ./scripts/define-streaming-vectortile-tasks.sh define-staging-streaming-update-tasks: - aws ecs register-task-definition \ - --family osmesa-streaming-stats-updater \ - --task-role-arn "arn:aws:iam:::role/ECSTaskS3" \ - --execution-role-arn "arn:aws:iam:::role/ecsTaskExecutionRole" \ - --network-mode awsvpc \ - --requires-compatibilities EC2 FARGATE \ - --cpu "1 vCPU" \ - --memory "4 GB" \ - --container-definitions '[ - { - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-group": "/ecs/${AWS_LOG_GROUP}-staging", - "awslogs-region": "${AWS_REGION}", - "awslogs-stream-prefix": "ecs" - } - }, - "command": [ - "/spark/bin/spark-submit", - "--driver-memory", "2048m", - "--class", "osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor", - "/opt/osmesa-analytics.jar", - "--augmented-diff-source", "${AUGDIFF_SOURCE}" - ], - "environment": [ - { - "name": "DATABASE_URL", - "value": "${DB_BASE_URI}${STAGING_DB}" - } - ], - "image": "${ECR_IMAGE}:latest", - "name": "streaming-augmented-diffs-stats-updater-staging" - }, - { - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-group": "/ecs/${AWS_LOG_GROUP}-staging", - "awslogs-region": "${AWS_REGION}", - "awslogs-stream-prefix": "ecs" - } - }, - "command": [ - "/spark/bin/spark-submit", - "--driver-memory", "2048m", - "--class", "osmesa.analytics.oneoffs.ChangesetStreamProcessor", - "/opt/osmesa-analytics.jar", - "--changeset-source", "${CHANGESET_SOURCE}" - ], - "environment": [ - { - "name": "DATABASE_URL", - "value": "${DB_URI}${STAGING_DB}" - } - ], - "image": "${ECR_IMAGE}:latest", - "name": "streaming-changesets-stats-updater-staging" - } - ]' + ./scripts/define-staging-streaming-update-tasks.sh define-production-streaming-update-tasks: - aws ecs register-task-definition \ - --family osmesa-streaming-stats-updater \ - --task-role-arn "arn:aws:iam:::role/ECSTaskS3" \ - --execution-role-arn "arn:aws:iam:::role/ecsTaskExecutionRole" \ - --network-mode awsvpc \ - --requires-compatibilities EC2 FARGATE \ - --cpu "1 vCPU" \ - --memory "4 GB" \ - --container-definitions '[ - { - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-group": "/ecs/${AWS_LOG_GROUP}", - "awslogs-region": "${AWS_REGION}", - "awslogs-stream-prefix": "ecs" - } - }, - "command": [ - "/spark/bin/spark-submit", - "--driver-memory", "2048m", - "--class", "osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor", - "/opt/osmesa-analytics.jar", - "--augmented-diff-source", "${AUGDIFF_SOURCE}" - ], - "environment": [ - { - "name": "DATABASE_URL", - "value": "${DB_URI}${PRODUCTION_DB}" - } - ], - "image": "${ECR_IMAGE}:production", - "name": "streaming-augmented-diffs-stats-updater" - }, - { - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-group": "/ecs/${AWS_LOG_GROUP}", - "awslogs-region": "${AWS_REGION}", - "awslogs-stream-prefix": "ecs" - } - }, - "command": [ - "/spark/bin/spark-submit", - "--driver-memory", "2048m", - "--class", "osmesa.analytics.oneoffs.ChangesetStreamProcessor", - "/opt/osmesa-analytics.jar", - "--changeset-source", "${CHANGESET_SOURCE}" - ], - "environment": [ - { - "name": "DATABASE_URL", - "value": "${DB_URI}${PRODUCTION_DB}" - } - ], - "image": "${ECR_IMAGE}:production", - "name": "streaming-changesets-stats-updater" - } - ]' + ./scripts/define-production-streaming-update-tasks.sh deploy-streaming-footprint-updater: aws ecs create-service \ - --cluster "osm-stat-stream-cluster" \ + --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ --service-name "streaming-user-footprint-tile-updater" \ --task-definition "streaming-edit-histogram-tile-updater" \ --desired-count 1 \ @@ -253,7 +69,7 @@ deploy-streaming-footprint-updater: deploy-streaming-edit-histogram-updater: aws ecs create-service \ - --cluster "osm-stat-stream-cluster" \ + --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ --service-name "streaming-edit-histogram-tile-updater" \ --task-definition "streaming-edit-histogram-tile-updater" \ --desired-count 1 \ @@ -297,7 +113,9 @@ deploy-streaming-stats-updaters: } }' -deploy-all-streaming-tasks: deploy-footprint-updater deploy-streaming-edit-histogram-updater deploy-streaming-stats-updaters +deploy-streaming-vectortile-tasks: deploy-footprint-updater deploy-streaming-edit-histogram-updater + +deploy-all-streaming-tasks: deploy-streaming-vectortile-tasks deploy-streaming-stats-updaters ################### # Batch AWS Tasks # diff --git a/deployment/streaming/config-deployment.mk.template b/deployment/streaming/config-deployment.mk.template index e6fad1c2..f9897d66 100644 --- a/deployment/streaming/config-deployment.mk.template +++ b/deployment/streaming/config-deployment.mk.template @@ -4,6 +4,7 @@ export KEYPAIR := export SUBNET := export AWS_REGION := us-east-1 +export IAM_ACCOUNT := ################################################################################ # Streaming resource definitions diff --git a/deployment/streaming/scripts/create-log-groups.sh b/deployment/streaming/scripts/create-log-groups.sh new file mode 100755 index 00000000..edfd45aa --- /dev/null +++ b/deployment/streaming/scripts/create-log-groups.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +if [ -z ${AWS_REGION+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +DEFINED_GROUPS=$(aws logs describe-log-groups | jq '.logGroups[].logGroupName' | sed -e 's/"//g') + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP} +fi + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}-staging"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP}-staging +fi + +if [[ $DEFINED_GROUPS != *"/ecs/streaming-user-footprint-tile-updater"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/streaming-user-footprint-tile-updater +fi + +if [[ $DEFINED_GROUPS != *"/ecs/streaming-edit-histogram-tile-updater"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/streaming-edit-histogram-tile-updater +fi diff --git a/deployment/streaming/scripts/define-production-streaming-update-tasks.sh b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh new file mode 100755 index 00000000..ed3d759d --- /dev/null +++ b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +if [ -z ${AWS_REGION+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family osmesa-streaming-stats-updater \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor\", + \"/opt/osmesa-analytics.jar\", + \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_URI}${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"streaming-augmented-diffs-stats-updater\" + }, + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.ChangesetStreamProcessor\", + \"/opt/osmesa-analytics.jar\", + \"--changeset-source\", \"${CHANGESET_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_URI}${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"streaming-changesets-stats-updater\" + } + ]" diff --git a/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh new file mode 100755 index 00000000..21f63eb4 --- /dev/null +++ b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +if [ -z ${AWS_REGION+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family osmesa-streaming-stats-updater \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor\", + \"/opt/osmesa-analytics.jar\", + \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"streaming-augmented-diffs-stats-updater-staging\" + }, + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.ChangesetStreamProcessor\", + \"/opt/osmesa-analytics.jar\", + \"--changeset-source\", \"${CHANGESET_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_URI}${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"streaming-changesets-stats-updater-staging\" + } + ]" diff --git a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh new file mode 100644 index 00000000..b38617bf --- /dev/null +++ b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +if [ -z ${AWS_REGION+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family osmesa-streaming-edit-histogram-tile-updater \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "2 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/streaming-edit-histogram-tile-updater\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingFacetedEditHistogramTileUpdater\", + \"/opt/osmesa-analytics.jar\", + \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\", + \"--tile-source\", \"${HISTOGRAM_VT_LOCATION}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}\", + \"name\": \"streaming-edit-histogram-tile-updater\" + }, + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/streaming-user-footprint-tile-updater\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingUserFootprintTileUpdater\", + \"/opt/osmesa-analytics.jar\", + \"--change-source\", \"${CHANGESET_SOURCE}\", + \"--tile-source\", \"${FOOTPRINT_VT_LOCATION}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"streaming-user-footprint-tile-updater\" + } + ]" diff --git a/deployment/streaming/expand.sh b/deployment/streaming/scripts/expand.sh similarity index 100% rename from deployment/streaming/expand.sh rename to deployment/streaming/scripts/expand.sh diff --git a/deployment/streaming/get-tag.sh b/deployment/streaming/scripts/get-tag.sh similarity index 100% rename from deployment/streaming/get-tag.sh rename to deployment/streaming/scripts/get-tag.sh From 4b96c25883b87d1273d0cc8d5a077aae6933b182 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Mon, 19 Aug 2019 11:40:49 -0400 Subject: [PATCH 10/24] Make script executable --- deployment/streaming/scripts/define-streaming-vectortile-tasks.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 deployment/streaming/scripts/define-streaming-vectortile-tasks.sh diff --git a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh old mode 100644 new mode 100755 From e636e4e6ca5048fc55ae8cba71a24d2a65084d6d Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Tue, 20 Aug 2019 17:15:51 -0400 Subject: [PATCH 11/24] Further changes to deployment scripts --- deployment/streaming/Makefile | 179 ++++-------------- .../scripts/batch-generate-edit-histograms.sh | 59 ++++++ .../scripts/batch-generate-footprints.sh | 61 ++++++ ...efine-production-streaming-update-tasks.sh | 6 +- .../define-staging-streaming-update-tasks.sh | 10 +- .../define-streaming-vectortile-tasks.sh | 2 +- .../oneoffs/ChangesetMetadataUpdater.scala | 2 +- .../oneoffs/ChangesetStatsUpdater.scala | 2 +- 8 files changed, 163 insertions(+), 158 deletions(-) create mode 100644 deployment/streaming/scripts/batch-generate-edit-histograms.sh create mode 100644 deployment/streaming/scripts/batch-generate-footprints.sh diff --git a/deployment/streaming/Makefile b/deployment/streaming/Makefile index 84866847..2414e8ae 100644 --- a/deployment/streaming/Makefile +++ b/deployment/streaming/Makefile @@ -9,9 +9,9 @@ ifeq ($(VERSION_TAG), production) else DATABASE=${STAGING_DB} ECS_CLUSTER=${CLUSTER_NAME_STAGING} - TASK_SUFFIX="-staging" + TASK_SUFFIX=-staging endif -DB_URI=${DB_BASE_URI}${DATABASE} +DB_URI=${DB_BASE_URI}/${DATABASE} .EXPORT_ALL_VARIABLES: @@ -51,6 +51,13 @@ define-staging-streaming-update-tasks: define-production-streaming-update-tasks: ./scripts/define-production-streaming-update-tasks.sh +deploy-streaming-footprint-updater: export NETWORK_CONFIGURATION='{\ + "awsvpcConfiguration": {\ + "subnets": ["${ECS_SUBNET}"],\ + "securityGroups": ["${ECS_SECURITY_GROUP}"],\ + "assignPublicIp": "DISABLED"\ + }\ + }' deploy-streaming-footprint-updater: aws ecs create-service \ --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ @@ -59,14 +66,15 @@ deploy-streaming-footprint-updater: --desired-count 1 \ --launch-type FARGATE \ --scheduling-strategy REPLICA \ - --network-configuration '{ - "awsvpcConfiguration": { - "subnets": ["${ECS_SUBNET}"], - "securityGroups": ["${ECS_SECURITY_GROUP}"], - "assignPublicIp": "DISABLED" - } - }' - + --network-configuration ${NETWORK_CONFIGURATION} + +deploy-streaming-edit-histogram-updater: export NETWORK_CONFIGURATION='{\ + "awsvpcConfiguration": {\ + "subnets": ["${ECS_SUBNET}"],\ + "securityGroups": ["${ECS_SECURITY_GROUP}"],\ + "assignPublicIp": "DISABLED"\ + }\ + }' deploy-streaming-edit-histogram-updater: aws ecs create-service \ --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ @@ -75,43 +83,24 @@ deploy-streaming-edit-histogram-updater: --desired-count 1 \ --launch-type FARGATE \ --scheduling-strategy REPLICA \ - --network-configuration '{ - "awsvpcConfiguration": { - "subnets": ["${ECS_SUBNET}"], - "securityGroups": ["${ECS_SECURITY_GROUP}"], - "assignPublicIp": "DISABLED" - } - }' - + --network-configuration ${NETWORK_CONFIGURATION} + +deploy-streaming-stats-updaters: export NETWORK_CONFIGURATION='{\ + "awsvpcConfiguration": {\ + "subnets": ["${ECS_SUBNET}"],\ + "securityGroups": ["${ECS_SECURITY_GROUP}"],\ + "assignPublicIp": "DISABLED"\ + }\ + }' deploy-streaming-stats-updaters: aws ecs create-service \ --cluster "${ECS_CLUSTER}" \ - --service-name "streaming-augdiffs-stats-updater" \ - --task-definition "streaming-augmented-diffs-stats-updater${TASK_SUFFIX}" \ - --desired-count 1 \ - --launch-type FARGATE \ - --scheduling-strategy REPLICA \ - --network-configuration '{ - "awsvpcConfiguration": { - "subnets": ["${ECS_SUBNET}"], - "securityGroups": ["${ECS_SECURITY_GROUP}"], - "assignPublicIp": "DISABLED" - } - }' - aws ecs create-service \ - --cluster "${ECS_CLUSTER}" \ - --service-name "streaming-changesets-stats-updater" \ - --task-definition "streaming-changesets-stats-updater${TASK_SUFFIX}" \ + --service-name "streaming-stats-updater" \ + --task-definition "streaming-stats-updater${TASK_SUFFIX}" \ --desired-count 1 \ --launch-type FARGATE \ --scheduling-strategy REPLICA \ - --network-configuration '{ - "awsvpcConfiguration": { - "subnets": ["${ECS_SUBNET}"], - "securityGroups": ["${ECS_SECURITY_GROUP}"], - "assignPublicIp": "DISABLED" - } - }' + --network-configuration ${NETWORK_CONFIGURATION} deploy-streaming-vectortile-tasks: deploy-footprint-updater deploy-streaming-edit-histogram-updater @@ -122,111 +111,7 @@ deploy-all-streaming-tasks: deploy-streaming-vectortile-tasks deploy-streaming-s ################### batch-generate-footprints: - aws emr create-cluster \ - --applications Name=Ganglia Name=Spark \ - --ebs-root-volume-size 10 \ - --ec2-attributes '{ - "KeyName": "${KEYPAIR}", - "InstanceProfile":"EMR_EC2_DefaultRole", - "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", - "SubnetId": "${SUBNET}", - "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", - "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" - }' \ - --service-role EMR_DefaultRole \ - --release-label emr-5.19.0 \ - --name 'User footprint tile generation' \ - --instance-groups '[ - { - "InstanceCount": 1, - "BidPrice": "OnDemandPrice", - "InstanceGroupType": "MASTER", - "InstanceType": "${BATCH_INSTANCE_TYPE}", - "Name":"Master" - }, { - "InstanceCount": 20, - "BidPrice": "OnDemandPrice", - "InstanceGroupType": "CORE", - "InstanceType": "${BATCH_INSTANCE_TYPE}", - "Name":"Workers" - } - ]' \ - --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ - --auto-terminate \ - --region ${AWS_REGION} \ - --steps '[ - { - "Args": [ - "spark-submit", - "--deploy-mode", "cluster", - "--class", "osmesa.analytics.oneoffs.FootprintCommand", - "--conf", "spark.executor.memoryOverhead=2g", - "--conf", "spark.sql.shuffle.partitions=2000", - "--conf", "spark.speculation=true", - "${OSMESA_ANALYTICS_JAR}", - "--history", "${NOME_HISTORY_ORC}", - "--changesets", "${NOME_CHANGESETS_ORC", - "--out", "${FOOTPRINT_VT_LOCATION}", - "--type", "users", - ], - "Type": "CUSTOM_JAR", - "ActionOnFailure": "TERMINATE_CLUSTER", - "Jar": "command-runner.jar", - "Properties": "", - "Name": "FootprintCommand" - } - ]' + ./scripts/batch-generate-footprints.sh batch-generate-edit-histograms: - aws emr create-cluster \ - --applications Name=Ganglia Name=Spark \ - --ebs-root-volume-size 10 \ - --ec2-attributes '{ - "KeyName": "${KEYPAIR}", - "InstanceProfile":"EMR_EC2_DefaultRole", - "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", - "SubnetId": "${SUBNET}", - "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", - "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" - }' \ - --service-role EMR_DefaultRole \ - --release-label emr-5.19.0 \ - --name 'Faceted State of the Data tile generation' \ - --instance-groups '[ - { - "InstanceCount": 1, - "BidPrice": "OnDemandPrice", - "InstanceGroupType": "MASTER", - "InstanceType": "${BATCH_INSTANCE_TYPE}", - "Name":"Master" - }, { - "InstanceCount": 20, - "BidPrice": "OnDemandPrice", - "InstanceGroupType": "CORE", - "InstanceType": "${BATCH_INSTANCE_TYPE}", - "Name":"Workers" - } - ]' \ - --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ - --auto-terminate \ - --region us-east-1 \ - --steps '[ - { - "Args": [ - "spark-submit", - "--deploy-mode", "cluster", - "--class", "osmesa.analytics.oneoffs.FacetedEditHistogramTileCreator", - "--conf", "spark.executor.memoryOverhead=2g", - "--conf", "spark.sql.shuffle.partitions=2000", - "--conf", "spark.speculation=true", - "${OSMESA_ANALYTICS_JAR}", - "--history", "${NOME_HISTORY_ORC}", - "--out", "${HISTOGRAM_VT_LOCATION}" - ], - "Type": "CUSTOM_JAR", - "ActionOnFailure": "TERMINATE_CLUSTER", - "Jar": "command-runner.jar", - "Properties": "", - "Name": "FacetedEditHistogramTileCreator" - } - ]' + ./scripts/batch-generate-edit-histograms.sh diff --git a/deployment/streaming/scripts/batch-generate-edit-histograms.sh b/deployment/streaming/scripts/batch-generate-edit-histograms.sh new file mode 100644 index 00000000..c588aeb8 --- /dev/null +++ b/deployment/streaming/scripts/batch-generate-edit-histograms.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +if [ -z ${AWS_REGION+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --ebs-root-volume-size 10 \ + --ec2-attributes '{ + "KeyName": "${KEYPAIR}", + "InstanceProfile":"EMR_EC2_DefaultRole", + "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", + "SubnetId": "${SUBNET}", + "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", + "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" + }' \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name 'Faceted State of the Data tile generation' \ + --instance-groups '[ + { + "InstanceCount": 1, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "MASTER", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Master" + }, { + "InstanceCount": 20, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "CORE", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Workers" + } + ]' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region us-east-1 \ + --steps '[ + { + "Args": [ + "spark-submit", + "--deploy-mode", "cluster", + "--class", "osmesa.analytics.oneoffs.FacetedEditHistogramTileCreator", + "--conf", "spark.executor.memoryOverhead=2g", + "--conf", "spark.sql.shuffle.partitions=2000", + "--conf", "spark.speculation=true", + "${OSMESA_ANALYTICS_JAR}", + "--history", "${NOME_HISTORY_ORC}", + "--out", "${HISTOGRAM_VT_LOCATION}" + ], + "Type": "CUSTOM_JAR", + "ActionOnFailure": "TERMINATE_CLUSTER", + "Jar": "command-runner.jar", + "Properties": "", + "Name": "FacetedEditHistogramTileCreator" + } + ]' diff --git a/deployment/streaming/scripts/batch-generate-footprints.sh b/deployment/streaming/scripts/batch-generate-footprints.sh new file mode 100644 index 00000000..c0429bac --- /dev/null +++ b/deployment/streaming/scripts/batch-generate-footprints.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +if [ -z ${AWS_REGION+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --ebs-root-volume-size 10 \ + --ec2-attributes '{ + "KeyName": "${KEYPAIR}", + "InstanceProfile":"EMR_EC2_DefaultRole", + "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", + "SubnetId": "${SUBNET}", + "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", + "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" + }' \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name 'User footprint tile generation' \ + --instance-groups '[ + { + "InstanceCount": 1, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "MASTER", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Master" + }, { + "InstanceCount": 20, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "CORE", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Workers" + } + ]' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region ${AWS_REGION} \ + --steps '[ + { + "Args": [ + "spark-submit", + "--deploy-mode", "cluster", + "--class", "osmesa.analytics.oneoffs.FootprintCommand", + "--conf", "spark.executor.memoryOverhead=2g", + "--conf", "spark.sql.shuffle.partitions=2000", + "--conf", "spark.speculation=true", + "${OSMESA_ANALYTICS_JAR}", + "--history", "${NOME_HISTORY_ORC}", + "--changesets", "${NOME_CHANGESETS_ORC", + "--out", "${FOOTPRINT_VT_LOCATION}", + "--type", "users", + ], + "Type": "CUSTOM_JAR", + "ActionOnFailure": "TERMINATE_CLUSTER", + "Jar": "command-runner.jar", + "Properties": "", + "Name": "FootprintCommand" + } + ]' diff --git a/deployment/streaming/scripts/define-production-streaming-update-tasks.sh b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh index ed3d759d..60f0f231 100755 --- a/deployment/streaming/scripts/define-production-streaming-update-tasks.sh +++ b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh @@ -6,7 +6,7 @@ if [ -z ${AWS_REGION+x} ]; then fi aws ecs register-task-definition \ - --family osmesa-streaming-stats-updater \ + --family streaming-stats-updater-production \ --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ --network-mode awsvpc \ @@ -33,7 +33,7 @@ aws ecs register-task-definition \ \"environment\": [ { \"name\": \"DATABASE_URL\", - \"value\": \"${DB_URI}${PRODUCTION_DB}\" + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" } ], \"image\": \"${ECR_IMAGE}:production\", @@ -58,7 +58,7 @@ aws ecs register-task-definition \ \"environment\": [ { \"name\": \"DATABASE_URL\", - \"value\": \"${DB_URI}${PRODUCTION_DB}\" + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" } ], \"image\": \"${ECR_IMAGE}:production\", diff --git a/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh index 21f63eb4..d6d1fd03 100755 --- a/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh +++ b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh @@ -6,7 +6,7 @@ if [ -z ${AWS_REGION+x} ]; then fi aws ecs register-task-definition \ - --family osmesa-streaming-stats-updater \ + --family streaming-stats-updater-staging \ --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ --network-mode awsvpc \ @@ -26,14 +26,14 @@ aws ecs register-task-definition \ \"command\": [ \"/spark/bin/spark-submit\", \"--driver-memory\", \"2048m\", - \"--class\", \"osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingChangesetStatsUpdater\", \"/opt/osmesa-analytics.jar\", \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\" ], \"environment\": [ { \"name\": \"DATABASE_URL\", - \"value\": \"${DB_BASE_URI}${STAGING_DB}\" + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" } ], \"image\": \"${ECR_IMAGE}:latest\", @@ -51,14 +51,14 @@ aws ecs register-task-definition \ \"command\": [ \"/spark/bin/spark-submit\", \"--driver-memory\", \"2048m\", - \"--class\", \"osmesa.analytics.oneoffs.ChangesetStreamProcessor\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingChangesetMetadataUpdater\", \"/opt/osmesa-analytics.jar\", \"--changeset-source\", \"${CHANGESET_SOURCE}\" ], \"environment\": [ { \"name\": \"DATABASE_URL\", - \"value\": \"${DB_URI}${STAGING_DB}\" + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" } ], \"image\": \"${ECR_IMAGE}:latest\", diff --git a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh index b38617bf..7f2c5821 100755 --- a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh +++ b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh @@ -6,7 +6,7 @@ if [ -z ${AWS_REGION+x} ]; then fi aws ecs register-task-definition \ - --family osmesa-streaming-edit-histogram-tile-updater \ + --family streaming-edit-histogram-tile-updater \ --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ --network-mode awsvpc \ diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala index 619cc903..f316a863 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala @@ -81,7 +81,7 @@ object ChangesetMetadataUpdater val options = Map( Source.BaseURI -> changesetSource.toString, - Source.ProcessName -> "ChangesetStream" + Source.ProcessName -> "ChangesetMetadataUpdater" ) ++ startSequence .map(s => Map(Source.StartSequence -> s.toString)) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala index 6d48a9de..7ba7ecdb 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala @@ -92,7 +92,7 @@ object ChangesetStatsUpdater val options = Map( Source.BaseURI -> augmentedDiffSource.toString, - Source.ProcessName -> "AugmentedDiffStream" + Source.ProcessName -> "ChangesetStatsUpdater" ) ++ startSequence .map(s => Map(Source.StartSequence -> s.toString)) From 4ab2bcc9aeda55915815daf759df5f6d8661662c Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Tue, 20 Aug 2019 17:18:50 -0400 Subject: [PATCH 12/24] Be smarter about which environment vars to check --- deployment/streaming/scripts/batch-generate-edit-histograms.sh | 2 +- deployment/streaming/scripts/batch-generate-footprints.sh | 2 +- deployment/streaming/scripts/create-log-groups.sh | 2 +- .../scripts/define-production-streaming-update-tasks.sh | 2 +- .../streaming/scripts/define-staging-streaming-update-tasks.sh | 2 +- .../streaming/scripts/define-streaming-vectortile-tasks.sh | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/deployment/streaming/scripts/batch-generate-edit-histograms.sh b/deployment/streaming/scripts/batch-generate-edit-histograms.sh index c588aeb8..c646f6e0 100644 --- a/deployment/streaming/scripts/batch-generate-edit-histograms.sh +++ b/deployment/streaming/scripts/batch-generate-edit-histograms.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ -z ${AWS_REGION+x} ]; then +if [ -z ${VERSION_TAG+x} ]; then echo "Do not run this script directly. Use the Makefile in the parent directory." exit 1 fi diff --git a/deployment/streaming/scripts/batch-generate-footprints.sh b/deployment/streaming/scripts/batch-generate-footprints.sh index c0429bac..c40401ac 100644 --- a/deployment/streaming/scripts/batch-generate-footprints.sh +++ b/deployment/streaming/scripts/batch-generate-footprints.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ -z ${AWS_REGION+x} ]; then +if [ -z ${VERSION_TAG+x} ]; then echo "Do not run this script directly. Use the Makefile in the parent directory." exit 1 fi diff --git a/deployment/streaming/scripts/create-log-groups.sh b/deployment/streaming/scripts/create-log-groups.sh index edfd45aa..459a92cc 100755 --- a/deployment/streaming/scripts/create-log-groups.sh +++ b/deployment/streaming/scripts/create-log-groups.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ -z ${AWS_REGION+x} ]; then +if [ -z ${VERSION_TAG+x} ]; then echo "Do not run this script directly. Use the Makefile in the parent directory." exit 1 fi diff --git a/deployment/streaming/scripts/define-production-streaming-update-tasks.sh b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh index 60f0f231..0d265591 100755 --- a/deployment/streaming/scripts/define-production-streaming-update-tasks.sh +++ b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ -z ${AWS_REGION+x} ]; then +if [ -z ${VERSION_TAG+x} ]; then echo "Do not run this script directly. Use the Makefile in the parent directory." exit 1 fi diff --git a/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh index d6d1fd03..85e7fe65 100755 --- a/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh +++ b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ -z ${AWS_REGION+x} ]; then +if [ -z ${VERSION_TAG+x} ]; then echo "Do not run this script directly. Use the Makefile in the parent directory." exit 1 fi diff --git a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh index 7f2c5821..b2d2b36c 100755 --- a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh +++ b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ -z ${AWS_REGION+x} ]; then +if [ -z ${VERSION_TAG+x} ]; then echo "Do not run this script directly. Use the Makefile in the parent directory." exit 1 fi From 957285c7ecf7766c9cdc19841f5bac1513258d61 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 23 Aug 2019 10:31:03 -0400 Subject: [PATCH 13/24] Improve batch process deployment --- deployment/streaming/scripts/batch-process.sh | 59 +++++++++++++ .../streaming/scripts/configurations.json | 88 +++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100755 deployment/streaming/scripts/batch-process.sh create mode 100644 deployment/streaming/scripts/configurations.json diff --git a/deployment/streaming/scripts/batch-process.sh b/deployment/streaming/scripts/batch-process.sh new file mode 100755 index 00000000..1d86521c --- /dev/null +++ b/deployment/streaming/scripts/batch-process.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +CLUSTER_NAME=$1 +STEP_NAME=$2 +NUM_EXECUTORS=$3 +ARGS=$4 + +set -x +aws emr create-cluster \ + --configurations "file://$(pwd)/scripts/configurations.json" \ + --applications Name=Ganglia Name=Spark \ + --log-uri ${S3_LOG_URI} \ + --ebs-root-volume-size 10 \ + --ec2-attributes "{ + \"KeyName\": \"${KEYPAIR}\", + \"InstanceProfile\":\"EMR_EC2_DefaultRole\", + \"SubnetId\": \"${SUBNET}\", + \"EmrManagedMasterSecurityGroup\": \"${MASTER_SECURITY_GROUP}\", + \"EmrManagedSlaveSecurityGroup\": \"${WORKER_SECURITY_GROUP}\", + \"ServiceAccessSecurityGroup\": \"${SERVICE_ACCESS_SG}\", + \"AdditionalMasterSecurityGroups\": [\"${SANDBOX_SG}\"], + \"AdditionalSlaveSecurityGroups\": [\"${SANDBOX_SG}\"] + }" \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name "$CLUSTER_NAME" \ + --instance-groups "[ + { + \"InstanceCount\": 1, + \"BidPrice\": \"OnDemandPrice\", + \"InstanceGroupType\": \"MASTER\", + \"InstanceType\": \"${BATCH_INSTANCE_TYPE}\", + \"Name\":\"Master\" + }, { + \"InstanceCount\": ${NUM_EXECUTORS}, + \"BidPrice\": \"OnDemandPrice\", + \"InstanceGroupType\": \"CORE\", + \"InstanceType\": \"${BATCH_INSTANCE_TYPE}\", + \"Name\":\"Workers\" + } + ]" \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region us-east-1 \ + --steps "[ + { + \"Args\": $ARGS, + \"Type\": \"CUSTOM_JAR\", + \"ActionOnFailure\": \"TERMINATE_CLUSTER\", + \"Jar\": \"command-runner.jar\", + \"Properties\": \"\", + \"Name\": \"$STEP_NAME\" + } + ]" diff --git a/deployment/streaming/scripts/configurations.json b/deployment/streaming/scripts/configurations.json new file mode 100644 index 00000000..b8db515b --- /dev/null +++ b/deployment/streaming/scripts/configurations.json @@ -0,0 +1,88 @@ +[ + { + "Classification": "spark", + "Properties": { + "maximizeResourceAllocation": "false" + } + }, + { + "Classification": "spark-defaults", + "Properties": { + "spark.driver.maxResultSize": "3G", + "spark.dynamicAllocation.enabled": "true", + "spark.shuffle.service.enabled": "true", + "spark.shuffle.compress": "true", + "spark.shuffle.spill.compress": "true", + "spark.rdd.compress": "true", + "spark.executor.memoryOverhead": "1G", + "spark.driver.memoryOverhead": "1G", + "spark.driver.maxResultSize": "3G", + "spark.executor.extraJavaOptions" : "-XX:+UseParallelGC -Dgeotrellis.s3.threads.rdd.write=64" + } + }, + { + "Classification": "hdfs-site", + "Properties": { + "dfs.replication": "1", + "dfs.permissions": "false", + "dfs.datanode.max.xcievers": "16384", + "dfs.datanode.max.transfer.threads": "16384", + "dfs.datanode.balance.max.concurrent.moves": "1000", + "dfs.datanode.balance.bandwidthPerSec": "100000000" + } + }, + { + "Classification": "yarn-site", + "Properties": { + "yarn.resourcemanager.am.max-attempts": "1", + "yarn.nodemanager.vmem-check-enabled": "false", + "yarn.nodemanager.pmem-check-enabled": "false" + } + }, + { + "Classification": "hadoop-env", + "Configurations": [ + { + "Classification": "export", + "Properties": { + "JAVA_HOME": "/usr/lib/jvm/java-1.8.0", + "GDAL_DATA": "/usr/local/share/gdal", + "LD_LIBRARY_PATH": "/usr/local/lib", + "PYSPARK_PYTHON": "python27", + "PYSPARK_DRIVER_PYTHON": "python27" + } + } + ] + }, + { + "Classification": "spark-env", + "Configurations": [ + { + "Classification": "export", + "Properties": { + "JAVA_HOME": "/usr/lib/jvm/java-1.8.0", + "GDAL_DATA": "/usr/local/share/gdal", + "LD_LIBRARY_PATH": "/usr/local/lib", + "SPARK_PRINT_LAUNCH_COMMAND": "1", + "PYSPARK_PYTHON": "python27", + "PYSPARK_DRIVER_PYTHON": "python27" + } + } + ] + }, + { + "Classification": "yarn-env", + "Configurations": [ + { + "Classification": "export", + "Properties": { + "JAVA_HOME": "/usr/lib/jvm/java-1.8.0", + "GDAL_DATA": "/usr/local/share/gdal", + "LD_LIBRARY_PATH": "/usr/local/lib", + "PYSPARK_PYTHON": "python27", + "PYSPARK_DRIVER_PYTHON": "python27" + } + } + ] + } +] From a4b3172e2281c91160fa7630b3874586efdce192 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 23 Aug 2019 10:32:51 -0400 Subject: [PATCH 14/24] Ensure streaming services are stopped during deployment cycle; simplify Makefile --- deployment/streaming/Makefile | 41 +++++++------------ .../streaming/config-deployment.mk.template | 2 + .../scripts/stop-streaming-service.sh | 28 +++++++++++++ 3 files changed, 45 insertions(+), 26 deletions(-) create mode 100755 deployment/streaming/scripts/stop-streaming-service.sh diff --git a/deployment/streaming/Makefile b/deployment/streaming/Makefile index 2414e8ae..254d2fe3 100644 --- a/deployment/streaming/Makefile +++ b/deployment/streaming/Makefile @@ -51,14 +51,10 @@ define-staging-streaming-update-tasks: define-production-streaming-update-tasks: ./scripts/define-production-streaming-update-tasks.sh -deploy-streaming-footprint-updater: export NETWORK_CONFIGURATION='{\ - "awsvpcConfiguration": {\ - "subnets": ["${ECS_SUBNET}"],\ - "securityGroups": ["${ECS_SECURITY_GROUP}"],\ - "assignPublicIp": "DISABLED"\ - }\ - }' -deploy-streaming-footprint-updater: +stop-streaming-footprint-updater: + ./scripts/stop-streaming-service.sh streaming-user-footprint-tile-updater + +deploy-streaming-footprint-updater: stop-streaming-footprint-updater aws ecs create-service \ --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ --service-name "streaming-user-footprint-tile-updater" \ @@ -68,14 +64,10 @@ deploy-streaming-footprint-updater: --scheduling-strategy REPLICA \ --network-configuration ${NETWORK_CONFIGURATION} -deploy-streaming-edit-histogram-updater: export NETWORK_CONFIGURATION='{\ - "awsvpcConfiguration": {\ - "subnets": ["${ECS_SUBNET}"],\ - "securityGroups": ["${ECS_SECURITY_GROUP}"],\ - "assignPublicIp": "DISABLED"\ - }\ - }' -deploy-streaming-edit-histogram-updater: +stop-streaming-edit-histogram-updater: + ./scripts/stop-streaming-service.sh streaming-edit-histogram-tile-updater + +deploy-streaming-edit-histogram-updater: stop-streaming-edit-histogram-updater aws ecs create-service \ --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ --service-name "streaming-edit-histogram-tile-updater" \ @@ -85,14 +77,10 @@ deploy-streaming-edit-histogram-updater: --scheduling-strategy REPLICA \ --network-configuration ${NETWORK_CONFIGURATION} -deploy-streaming-stats-updaters: export NETWORK_CONFIGURATION='{\ - "awsvpcConfiguration": {\ - "subnets": ["${ECS_SUBNET}"],\ - "securityGroups": ["${ECS_SECURITY_GROUP}"],\ - "assignPublicIp": "DISABLED"\ - }\ - }' -deploy-streaming-stats-updaters: +stop-streaming-stats-updaters: + ./scripts/stop-streaming-service.sh streaming-stats-updater + +deploy-streaming-stats-updaters: stop-streaming-stats-updaters aws ecs create-service \ --cluster "${ECS_CLUSTER}" \ --service-name "streaming-stats-updater" \ @@ -104,8 +92,6 @@ deploy-streaming-stats-updaters: deploy-streaming-vectortile-tasks: deploy-footprint-updater deploy-streaming-edit-histogram-updater -deploy-all-streaming-tasks: deploy-streaming-vectortile-tasks deploy-streaming-stats-updaters - ################### # Batch AWS Tasks # ################### @@ -115,3 +101,6 @@ batch-generate-footprints: batch-generate-edit-histograms: ./scripts/batch-generate-edit-histograms.sh + +batch-generate-db-backfill: + ./scripts/batch-process.sh "OSMesa Batch Process" "ChangesetStatsCreator" 64 "[\"spark-submit\", \"--deploy-mode\", \"cluster\", \"--class\", \"osmesa.analytics.oneoffs.ChangesetStatsCreator\", \"--conf\", \"spark.executor.memoryOverhead=2g\", \"--conf\", \"spark.sql.shuffle.partitions=2000\", \"--conf\", \"spark.speculation=true\", \"${OSMESA_ANALYTICS_JAR}\", \"--history\", \"${NOME_HISTORY_ORC}\", \"--changesets\", \"${NOME_CHANGESETS_ORC}\", \"--changeset-stream\", \"${CHANGESET_SOURCE}\", \"--database-url\", \"${DB_URI}\"]" diff --git a/deployment/streaming/config-deployment.mk.template b/deployment/streaming/config-deployment.mk.template index f9897d66..980fdb71 100644 --- a/deployment/streaming/config-deployment.mk.template +++ b/deployment/streaming/config-deployment.mk.template @@ -23,6 +23,8 @@ export DB_BASE_URI := export PRODUCTION_DB := export STAGING_DB := +export NETWORK_CONFIGURATION="{\"awsvpcConfiguration\": {\"subnets\": [\"${ECS_SUBNET}\"], \"securityGroups\": [\"${ECS_SECURITY_GROUP}\"], \"assignPublicIp\": \"DISABLED\"}}" + ################################################################################ # Batch resource definitions ################################################################################ diff --git a/deployment/streaming/scripts/stop-streaming-service.sh b/deployment/streaming/scripts/stop-streaming-service.sh new file mode 100755 index 00000000..2eee9af7 --- /dev/null +++ b/deployment/streaming/scripts/stop-streaming-service.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +SERVICE=$1 +echo "Attempting to stop $SERVICE on cluster $ECS_CLUSTER" + +check_status() { + STATUS=$(aws ecs describe-services --services $SERVICE --cluster $ECS_CLUSTER | jq '.services[].status') +} + +check_status +if [[ $STATUS == "\"ACTIVE\"" ]]; then + aws ecs delete-service --service $SERVICE --cluster $ECS_CLUSTER --force + echo "Waiting for shut down" + check_status + while [[ $STATUS != "\"INACTIVE\"" ]]; do + echo " current status: $STATUS, still waiting" + sleep 15s + check_status + done + echo " final status: $STATUS" +else + echo "Status was $STATUS, nothing to stop" +fi From 01bf4571f4031681891b81114d9ce175faf77fc1 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 23 Aug 2019 10:33:16 -0400 Subject: [PATCH 15/24] Modify log messages --- .../osmesa/analytics/oneoffs/ChangesetStatsCreator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index 7ef56ac2..95afec40 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -179,13 +179,13 @@ object ChangesetStatsCreator }) // Distributing these writes to the executors to avoid no suitable driver errors on master node - logger.warn(s"Writing AugmentedDiffStream sequence number as $augdiffEndSequence to $databaseUrl") + logger.warn(s"Writing augmented diff sequence number as $augdiffEndSequence to $databaseUrl") spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => - ChangesetORCUpdaterUtils.saveLocations("AugmentedDiffStream", augdiffEndSequence, uri) + ChangesetORCUpdaterUtils.saveLocations("ChangesetStatsUpdater", augdiffEndSequence, uri) } - logger.warn(s"Writing ChangesetStream sequence number as $changesetsEndSequence to $databaseUrl") + logger.warn(s"Writing changeset stream sequence number as $changesetsEndSequence to $databaseUrl") spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => - ChangesetORCUpdaterUtils.saveLocations("ChangesetStream", changesetsEndSequence, uri) + ChangesetORCUpdaterUtils.saveLocations("ChangesetMetadataUpdater", changesetsEndSequence, uri) } spark.stop() From 6da0912aa62876c1f47338e8bec08ccd1f4cce9f Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Wed, 18 Sep 2019 13:16:44 -0400 Subject: [PATCH 16/24] Allow easy addition of extra EBS volumes in batch EMR deployment --- deployment/batch/makefiles/Makefile | 16 ++++++++++++++-- deployment/batch/makefiles/config-emr.mk | 10 +++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/deployment/batch/makefiles/Makefile b/deployment/batch/makefiles/Makefile index 26236cf4..559e0348 100644 --- a/deployment/batch/makefiles/Makefile +++ b/deployment/batch/makefiles/Makefile @@ -25,6 +25,18 @@ ifndef CLUSTER_ID CLUSTER_ID=$(shell if [ -e "cluster-id.txt" ]; then cat cluster-id.txt; fi) endif +ifndef CORE_EMR_ATTRS +EMR_ATTRS_CORE= +else +EMR_ATTRS_CORE=,${CORE_EMR_ATTRS} +endif + +ifndef MASTER_EMR_ATTRS +EMR_ATTRS_MASTER= +else +EMR_ATTRS_MASTER=,${MASTER_EMR_ATTRS} +endif + rwildcard=$(foreach d,$(wildcard $1*),$(call rwildcard,$d/,$2) $(filter $(subst *,%,$2),$d)) ${INGEST_ASSEMBLY}: $(call rwildcard, ${INGEST_SRC_DIR}/src, *.scala) ${INGEST_SRC_DIR}/build.sbt @@ -45,8 +57,8 @@ create-cluster: --ec2-attributes KeyName=${EC2_KEY},SubnetId=${SUBNET_ID},EmrManagedMasterSecurityGroup=${MASTER_SECURITY_GROUP},EmrManagedSlaveSecurityGroup=${WORKER_SECURITY_GROUP},ServiceAccessSecurityGroup=${SERVICE_ACCESS_SG},AdditionalMasterSecurityGroups=${SANDBOX_SG},AdditionalSlaveSecurityGroups=${SANDBOX_SG} \ --applications Name=Ganglia Name=Hadoop Name=Hue Name=Spark Name=Zeppelin \ --instance-groups \ -'Name=Master,${MASTER_BID_PRICE}InstanceCount=1,InstanceGroupType=MASTER,InstanceType=${MASTER_INSTANCE}' \ -'Name=Workers,${WORKER_BID_PRICE}InstanceCount=${WORKER_COUNT},InstanceGroupType=CORE,InstanceType=${WORKER_INSTANCE}' \ +'Name=Master,${MASTER_BID_PRICE}InstanceCount=1,InstanceGroupType=MASTER,InstanceType=${MASTER_INSTANCE}${EMR_ATTRS_MASTER}' \ +'Name=Workers,${WORKER_BID_PRICE}InstanceCount=${WORKER_COUNT},InstanceGroupType=CORE,InstanceType=${WORKER_INSTANCE}${EMR_ATTRS_CORE}' \ | tee cluster-id.txt upload-ingest: ${INGEST_ASSEMBLY} diff --git a/deployment/batch/makefiles/config-emr.mk b/deployment/batch/makefiles/config-emr.mk index e6ec9d3d..cb8bedfb 100644 --- a/deployment/batch/makefiles/config-emr.mk +++ b/deployment/batch/makefiles/config-emr.mk @@ -6,9 +6,9 @@ export SANDBOX_SG := sg-6b227c23 export MASTER_INSTANCE := m3.xlarge export MASTER_PRICE := 0.10 -export WORKER_INSTANCE := m3.xlarge -export WORKER_PRICE := 0.07 -export WORKER_COUNT := 32 +export WORKER_INSTANCE := r3.xlarge +export WORKER_PRICE := 0.20 +export WORKER_COUNT := 64 export USE_SPOT := true export DRIVER_MEMORY := 10000M @@ -16,3 +16,7 @@ export DRIVER_CORES := 4 export EXECUTOR_MEMORY := 10000M export EXECUTOR_CORES := 8 export YARN_OVERHEAD := 1500 + +# Uncomment/edit the followings line to add extra attributes to the cluster creation +#export MASTER_EMR_ATTRS := +export CORE_EMR_ATTRS := EbsConfiguration={EbsOptimized=true,EbsBlockDeviceConfigs=[{VolumeSpecification={VolumeType=gp2,SizeInGB=1024}}]} From 06c46d4616786d205a6d719ebe2581fdb7d8898a Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 19 Sep 2019 15:51:59 -0400 Subject: [PATCH 17/24] Remove useful components to Vectorpipe and rename ChangesetORCCreator --- .../oneoffs/ChangesetStatsCreator.scala | 7 ++- ...ORCCreator.scala => MergeChangesets.scala} | 56 ++----------------- 2 files changed, 10 insertions(+), 53 deletions(-) rename src/analytics/src/main/scala/osmesa/analytics/oneoffs/{ChangesetORCCreator.scala => MergeChangesets.scala} (76%) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index 95afec40..3fd0975f 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -14,6 +14,7 @@ import osmesa.analytics.stats.functions._ import vectorpipe.{internal => ProcessOSM} import vectorpipe.functions._ import vectorpipe.functions.osm._ +import vectorpipe.sources.ChangesetSource import vectorpipe.util.{DBUtils, Geocode} object ChangesetStatsCreator @@ -126,7 +127,7 @@ object ChangesetStatsCreator val changesets = spark.read.orc(changesetSource) val changesetsEndSequence = { val t = changesets.select(max(coalesce('createdAt, 'closedAt))).first.getAs[java.sql.Timestamp](0) - ChangesetORCUpdaterUtils.findSequenceFor(t.toInstant, changesetBaseURI).toInt + ChangesetSource.findSequenceFor(t.toInstant, changesetBaseURI).toInt } val changesetMetadata = changesets @@ -181,11 +182,11 @@ object ChangesetStatsCreator // Distributing these writes to the executors to avoid no suitable driver errors on master node logger.warn(s"Writing augmented diff sequence number as $augdiffEndSequence to $databaseUrl") spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => - ChangesetORCUpdaterUtils.saveLocations("ChangesetStatsUpdater", augdiffEndSequence, uri) + MergeChangesetUtils.saveLocations("ChangesetStatsUpdater", augdiffEndSequence, uri) } logger.warn(s"Writing changeset stream sequence number as $changesetsEndSequence to $databaseUrl") spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => - ChangesetORCUpdaterUtils.saveLocations("ChangesetMetadataUpdater", changesetsEndSequence, uri) + MergeChangesetUtils.saveLocations("ChangesetMetadataUpdater", changesetsEndSequence, uri) } spark.stop() diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala similarity index 76% rename from src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala rename to src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala index 312caa8e..84e579e8 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetORCCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala @@ -31,13 +31,14 @@ import scalaj.http.Http * s3://path/to/history.orc * s3://path/to/output.orc */ -object ChangesetORCUpdater +object MergeChangesets extends CommandApp( name = "osmesa-changeset-orc-updater", header = "Bring existing changesets ORC file up to date using changeset stream", main = { - import ChangesetORCUpdaterUtils._ + import MergeChangesetUtils._ + import ChangesetSource._ val changesetSourceOpt = Opts @@ -77,7 +78,7 @@ object ChangesetORCUpdater orcArg, outputArg).mapN { (changesetSource, endTime, orcUri, outputURI) => - implicit val spark: SparkSession = Analytics.sparkSession("ChangesetORCCreator") + implicit val spark: SparkSession = Analytics.sparkSession("MergeChangesets") import spark.implicits._ @@ -85,7 +86,7 @@ object ChangesetORCUpdater val lastModified = df.select(max(coalesce('closed_at, 'created_at))).first.getAs[Timestamp](0) val startSequence = findSequenceFor(lastModified.toInstant, changesetSource) - val endSequence = endTime.map(findSequenceFor(_, changesetSource)).getOrElse(getCurrentSequence(changesetSource).sequence) + val endSequence = endTime.map(findSequenceFor(_, changesetSource)).getOrElse(getCurrentSequence(changesetSource).get.sequence) val options = Map( Source.BaseURI -> changesetSource.toString, @@ -120,7 +121,7 @@ object ChangesetORCUpdater } ) -object ChangesetORCUpdaterUtils { +object MergeChangesetUtils { implicit val readInstant: Argument[Instant] = new Argument[Instant] { override def read(string: String): ValidatedNel[String, Instant] = { try { Validated.valid(Instant.parse(string)) } @@ -135,51 +136,6 @@ object ChangesetORCUpdaterUtils { private implicit val dateTimeDecoder: Decoder[DateTime] = Decoder.instance(a => a.as[String].map(DateTime.parse(_, formatter))) - case class Sequence(last_run: DateTime, sequence: Long) - - def getCurrentSequence(baseURI: URI): Sequence = { - val response = - Http(baseURI.resolve("state.yaml").toString).asString - - val state = yaml.parser - .parse(response.body) - .leftMap(err => err: Error) - .flatMap(_.as[Sequence]) - .valueOr(throw _) - - state - } - - def getSequence(baseURI: URI, sequence: Long): Sequence = { - val s = f"${sequence+1}%09d" - val path = s"${s.slice(0, 3)}/${s.slice(3, 6)}/${s.slice(6, 9)}.state.txt" - - val response = - Http(baseURI.resolve(path).toString).asString - - yaml.parser - .parse(response.body) - .leftMap(err => err: Error) - .flatMap(_.as[Sequence]) - .valueOr(throw _) - } - - def estimateSequenceNumber(modifiedTime: Instant, baseURI: URI): Long = { - val current = getCurrentSequence(baseURI) - val diffMinutes = (current.last_run.toInstant.getMillis/1000 - modifiedTime.getEpochSecond) / 60 - current.sequence - diffMinutes - } - - def findSequenceFor(modifiedTime: Instant, baseURI: URI): Long = { - var guess = estimateSequenceNumber(modifiedTime, baseURI) - val target = org.joda.time.Instant.parse(modifiedTime.toString) - - while (getSequence(baseURI, guess).last_run.isAfter(target)) { guess -= 1 } - while (getSequence(baseURI, guess).last_run.isBefore(target)) { guess += 1 } - - getSequence(baseURI, guess).sequence - } - def saveLocations(procName: String, sequence: Int, databaseURI: URI) = { var connection = null.asInstanceOf[Connection] try { From 3349bf5c6c3c46c2bebcb86342e176ba1dde7020 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Wed, 25 Sep 2019 16:45:14 -0400 Subject: [PATCH 18/24] Use VP functions for timestamp/seqence number conversions --- .../osmesa/analytics/oneoffs/ChangesetStatsCreator.scala | 6 +++--- .../scala/osmesa/analytics/oneoffs/MergeChangesets.scala | 2 +- .../analytics/oneoffs/MergedChangesetStreamProcessor.scala | 7 ++----- .../oneoffs/StreamingChangesetMetadataUpdater.scala | 4 ++-- .../analytics/oneoffs/StreamingChangesetStatsUpdater.scala | 4 ++-- .../oneoffs/StreamingFacetedEditHistogramTileUpdater.scala | 4 ++-- 6 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index 3fd0975f..f316e4f5 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -14,7 +14,7 @@ import osmesa.analytics.stats.functions._ import vectorpipe.{internal => ProcessOSM} import vectorpipe.functions._ import vectorpipe.functions.osm._ -import vectorpipe.sources.ChangesetSource +import vectorpipe.sources.{AugmentedDiffSource, ChangesetSource} import vectorpipe.util.{DBUtils, Geocode} object ChangesetStatsCreator @@ -59,8 +59,8 @@ object ChangesetStatsCreator val history = spark.read.orc(historySource) val augdiffEndSequence = { - val t = history.select(max('timestamp)).first.getAs[java.sql.Timestamp](0).toInstant - ((t.getEpochSecond - 1347432900) / 60).toInt + val t = history.select(max('timestamp)).first.getAs[java.sql.Timestamp](0) + AugmentedDiffSource.timestampToSequence(t) } val nodes = ProcessOSM.preprocessNodes(history) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala index 84e579e8..20a002e1 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala @@ -137,7 +137,7 @@ object MergeChangesetUtils { Decoder.instance(a => a.as[String].map(DateTime.parse(_, formatter))) def saveLocations(procName: String, sequence: Int, databaseURI: URI) = { - var connection = null.asInstanceOf[Connection] + var connection: Connection = null try { connection = DBUtils.getJdbcConnection(databaseURI) val upsertSequence = diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala index 93380f2a..4f9766a7 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.functions._ import vectorpipe.functions._ import vectorpipe.functions.osm._ import vectorpipe.model.ElementWithSequence -import vectorpipe.sources.Source +import vectorpipe.sources.{AugmentedDiffSource, Source} /* * Usage example: @@ -160,10 +160,7 @@ object MergedChangesetStreamProcessor ) val geomsWithWatermark = geoms - .withColumn( - "timestamp", - to_timestamp('sequence * 60 + 1347432900) - ) + .withColumn("timestamp", AugmentedDiffSource.sequenceToTimestamp('sequence)) // geoms are standalone; no need to wait for anything .withWatermark("timestamp", "0 seconds") .select('timestamp, 'changeset, '_type, 'id, 'version, 'minorVersion, 'updated) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala index b145490d..91628d3c 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala @@ -24,8 +24,8 @@ import osmesa.analytics.stats.ChangesetMetadataForeachWriter */ object StreamingChangesetMetadataUpdater extends CommandApp( - name = "osmesa-augmented-diff-stream-processor", - header = "Update statistics from streaming augmented diffs", + name = "osmesa-changeset-stream-processor", + header = "Update statistics from changeset replication stream", main = { val changesetSourceOpt = Opts diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala index c9f9291f..f653d3b9 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala @@ -14,7 +14,7 @@ import vectorpipe.{internal => ProcessOSM} import vectorpipe.functions._ import vectorpipe.functions.osm._ import vectorpipe.model.ElementWithSequence -import vectorpipe.sources.Source +import vectorpipe.sources.{AugmentedDiffSource, Source} import vectorpipe.util.{DBUtils, Geocode} /* @@ -108,7 +108,7 @@ object StreamingChangesetStatsUpdater // in practice, this means that aggregation doesn't occur until the *next* sequence is received val query = Geocode(geoms.where(isTagged('tags))) - .withColumn("timestamp", to_timestamp('sequence * 60 + 1347432900)) + .withColumn("timestamp", AugmentedDiffSource.sequenceToTimestamp('sequence)) // if sequences are received sequentially (and atomically), 0 seconds should suffice; anything received with an // earlier timestamp after that point will be dropped .withWatermark("timestamp", "0 seconds") diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala index 41420ea2..355a3c6e 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala @@ -13,7 +13,7 @@ import osmesa.analytics.{Analytics, EditHistogram} import vectorpipe.{internal => ProcessOSM} import vectorpipe.internal.{NodeType, WayType} import vectorpipe.functions.osm._ -import vectorpipe.sources.Source +import vectorpipe.sources.{AugmentedDiffSource, Source} object StreamingFacetedEditHistogramTileUpdater extends CommandApp( @@ -117,7 +117,7 @@ object StreamingFacetedEditHistogramTileUpdater .options(options) .load // convert sequence into timestamp - .withColumn("watermark", to_timestamp(from_unixtime('sequence * 60 + 1347432900))) + .withColumn("watermark", AugmentedDiffSource.sequenceToTimestamp('sequence)) .withWatermark("watermark", "0 seconds") val nodes = diffs From baca909e376116350fbb0cf60e32889a9194f0b3 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 26 Sep 2019 12:36:35 -0400 Subject: [PATCH 19/24] [skip ci] Minor formatting update; leave note for future work --- .../oneoffs/ChangesetStatsCreator.scala | 18 +++++++++--------- .../analytics/oneoffs/MergeChangesets.scala | 3 +++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index f316e4f5..2ee9a19c 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -29,15 +29,15 @@ object ChangesetStatsCreator Opts .option[String]("changesets", help = "Location of the Changesets ORC file to process.") - val changesetBaseOpt = - Opts - .option[URI]( - "changeset-stream", - short = "c", - metavar = "uri", - help = "HTTP Location of replication changesets" - ) - .validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") } + val changesetBaseOpt = + Opts + .option[URI]( + "changeset-stream", + short = "c", + metavar = "uri", + help = "HTTP Location of replication changesets" + ) + .validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") } val databaseUrlOpt = Opts diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala index 20a002e1..9b555048 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala @@ -95,6 +95,9 @@ object MergeChangesets ) val changesets = spark.read.format(Source.Changesets).options(options).load + + // TODO: Clean up the following by providing and using a function in VP to coerce the + // column names into camel case (see https://github.com/geotrellis/vectorpipe/issues/113) changesets .drop("comments", "sequence") .union(df.select( From 64ae32e6c2e67b0ed0941f8fb6f6db2c32fe26ea Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 26 Sep 2019 13:52:24 -0400 Subject: [PATCH 20/24] Clean up references --- deployment/streaming/Makefile | 2 +- deployment/streaming/config-deployment.mk.template | 4 ++-- .../streaming/scripts/batch-generate-edit-histograms.sh | 2 +- deployment/streaming/scripts/batch-generate-footprints.sh | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/deployment/streaming/Makefile b/deployment/streaming/Makefile index 254d2fe3..e443a50f 100644 --- a/deployment/streaming/Makefile +++ b/deployment/streaming/Makefile @@ -103,4 +103,4 @@ batch-generate-edit-histograms: ./scripts/batch-generate-edit-histograms.sh batch-generate-db-backfill: - ./scripts/batch-process.sh "OSMesa Batch Process" "ChangesetStatsCreator" 64 "[\"spark-submit\", \"--deploy-mode\", \"cluster\", \"--class\", \"osmesa.analytics.oneoffs.ChangesetStatsCreator\", \"--conf\", \"spark.executor.memoryOverhead=2g\", \"--conf\", \"spark.sql.shuffle.partitions=2000\", \"--conf\", \"spark.speculation=true\", \"${OSMESA_ANALYTICS_JAR}\", \"--history\", \"${NOME_HISTORY_ORC}\", \"--changesets\", \"${NOME_CHANGESETS_ORC}\", \"--changeset-stream\", \"${CHANGESET_SOURCE}\", \"--database-url\", \"${DB_URI}\"]" + ./scripts/batch-process.sh "OSMesa Batch Process" "ChangesetStatsCreator" 64 "[\"spark-submit\", \"--deploy-mode\", \"cluster\", \"--class\", \"osmesa.analytics.oneoffs.ChangesetStatsCreator\", \"--conf\", \"spark.executor.memoryOverhead=2g\", \"--conf\", \"spark.sql.shuffle.partitions=2000\", \"--conf\", \"spark.speculation=true\", \"${OSMESA_ANALYTICS_JAR}\", \"--history\", \"${HISTORY_ORC}\", \"--changesets\", \"${CHANGESETS_ORC}\", \"--changeset-stream\", \"${CHANGESET_SOURCE}\", \"--database-url\", \"${DB_URI}\"]" diff --git a/deployment/streaming/config-deployment.mk.template b/deployment/streaming/config-deployment.mk.template index 980fdb71..cf7ba4cc 100644 --- a/deployment/streaming/config-deployment.mk.template +++ b/deployment/streaming/config-deployment.mk.template @@ -35,8 +35,8 @@ export EMR_SLAVE_SECURITY_GROUP := export BATCH_INSTANCE_TYPE := m4.xlarge export OSMESA_ANALYTICS_JAR := s3:///osmesa-analytics.jar -export NOME_HISTORY_ORC := -export NOME_CHANGESETS_ORC := +export HISTORY_ORC := +export CHANGESETS_ORC := export FOOTPRINT_VT_LOCATION := export HISTOGRAM_VT_LOCATION := diff --git a/deployment/streaming/scripts/batch-generate-edit-histograms.sh b/deployment/streaming/scripts/batch-generate-edit-histograms.sh index c646f6e0..934e63d7 100644 --- a/deployment/streaming/scripts/batch-generate-edit-histograms.sh +++ b/deployment/streaming/scripts/batch-generate-edit-histograms.sh @@ -47,7 +47,7 @@ aws emr create-cluster \ "--conf", "spark.sql.shuffle.partitions=2000", "--conf", "spark.speculation=true", "${OSMESA_ANALYTICS_JAR}", - "--history", "${NOME_HISTORY_ORC}", + "--history", "${HISTORY_ORC}", "--out", "${HISTOGRAM_VT_LOCATION}" ], "Type": "CUSTOM_JAR", diff --git a/deployment/streaming/scripts/batch-generate-footprints.sh b/deployment/streaming/scripts/batch-generate-footprints.sh index c40401ac..2e1efde2 100644 --- a/deployment/streaming/scripts/batch-generate-footprints.sh +++ b/deployment/streaming/scripts/batch-generate-footprints.sh @@ -47,8 +47,8 @@ aws emr create-cluster \ "--conf", "spark.sql.shuffle.partitions=2000", "--conf", "spark.speculation=true", "${OSMESA_ANALYTICS_JAR}", - "--history", "${NOME_HISTORY_ORC}", - "--changesets", "${NOME_CHANGESETS_ORC", + "--history", "${HISTORY_ORC}", + "--changesets", "${CHANGESETS_ORC", "--out", "${FOOTPRINT_VT_LOCATION}", "--type", "users", ], From 7dd079e37b532b1e4d301e94e47e74f4aaccee83 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 26 Sep 2019 13:57:35 -0400 Subject: [PATCH 21/24] Minor fixes to address comments --- deployment/streaming/scripts/batch-generate-footprints.sh | 2 +- .../main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deployment/streaming/scripts/batch-generate-footprints.sh b/deployment/streaming/scripts/batch-generate-footprints.sh index 2e1efde2..fa09aa98 100644 --- a/deployment/streaming/scripts/batch-generate-footprints.sh +++ b/deployment/streaming/scripts/batch-generate-footprints.sh @@ -48,7 +48,7 @@ aws emr create-cluster \ "--conf", "spark.speculation=true", "${OSMESA_ANALYTICS_JAR}", "--history", "${HISTORY_ORC}", - "--changesets", "${CHANGESETS_ORC", + "--changesets", "${CHANGESETS_ORC}", "--out", "${FOOTPRINT_VT_LOCATION}", "--type", "users", ], diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala index 9b555048..4d4ea3b6 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala @@ -24,7 +24,7 @@ import scalaj.http.Http * sbt "project analytics" assembly * * spark-submit \ - * --class osmesa.analytics.oneoffs.ChangesetORCUpdater \ + * --class osmesa.analytics.oneoffs.MergeChangesets \ * ingest/target/scala-2.11/osmesa-analytics.jar \ * --changesets http://location/of/changeset/replications \ * --end-time 1970-01-01T13:00:00Z From 5a26276a9d315404bf3492b3f02f0047bdf41ec0 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 26 Sep 2019 13:59:46 -0400 Subject: [PATCH 22/24] Move to VP 1.1.0 --- src/project/Version.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/project/Version.scala b/src/project/Version.scala index 3a11f020..64ca5a79 100644 --- a/src/project/Version.scala +++ b/src/project/Version.scala @@ -3,7 +3,7 @@ object Version { val osmesa = "0.1.0" val geotrellis = "2.1.0" val geomesa = "2.3.0" - val vectorpipe = "1.1.0-SNAPSHOT" + val vectorpipe = "1.1.0" val decline = "0.5.0" val cats = "1.0.0" val scalactic = "3.0.3" From 115ef174007e64ed1e08c2fe2fd5e6675df3f281 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 26 Sep 2019 14:00:00 -0400 Subject: [PATCH 23/24] Fix description --- .../main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala index 4d4ea3b6..06d7dfde 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala @@ -33,8 +33,8 @@ import scalaj.http.Http */ object MergeChangesets extends CommandApp( - name = "osmesa-changeset-orc-updater", - header = "Bring existing changesets ORC file up to date using changeset stream", + name = "osmesa-merge-changesets", + header = "Bring existing changeset ORC file up to date using changeset stream", main = { import MergeChangesetUtils._ From 99d812ee67ad545bf65ab9b1996b28032d5ac6e5 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Thu, 26 Sep 2019 15:35:58 -0400 Subject: [PATCH 24/24] Remove cluster configurations --- deployment/streaming/scripts/batch-process.sh | 1 - .../streaming/scripts/configurations.json | 88 ------------------- 2 files changed, 89 deletions(-) delete mode 100644 deployment/streaming/scripts/configurations.json diff --git a/deployment/streaming/scripts/batch-process.sh b/deployment/streaming/scripts/batch-process.sh index 1d86521c..cb45ee2e 100755 --- a/deployment/streaming/scripts/batch-process.sh +++ b/deployment/streaming/scripts/batch-process.sh @@ -12,7 +12,6 @@ ARGS=$4 set -x aws emr create-cluster \ - --configurations "file://$(pwd)/scripts/configurations.json" \ --applications Name=Ganglia Name=Spark \ --log-uri ${S3_LOG_URI} \ --ebs-root-volume-size 10 \ diff --git a/deployment/streaming/scripts/configurations.json b/deployment/streaming/scripts/configurations.json deleted file mode 100644 index b8db515b..00000000 --- a/deployment/streaming/scripts/configurations.json +++ /dev/null @@ -1,88 +0,0 @@ -[ - { - "Classification": "spark", - "Properties": { - "maximizeResourceAllocation": "false" - } - }, - { - "Classification": "spark-defaults", - "Properties": { - "spark.driver.maxResultSize": "3G", - "spark.dynamicAllocation.enabled": "true", - "spark.shuffle.service.enabled": "true", - "spark.shuffle.compress": "true", - "spark.shuffle.spill.compress": "true", - "spark.rdd.compress": "true", - "spark.executor.memoryOverhead": "1G", - "spark.driver.memoryOverhead": "1G", - "spark.driver.maxResultSize": "3G", - "spark.executor.extraJavaOptions" : "-XX:+UseParallelGC -Dgeotrellis.s3.threads.rdd.write=64" - } - }, - { - "Classification": "hdfs-site", - "Properties": { - "dfs.replication": "1", - "dfs.permissions": "false", - "dfs.datanode.max.xcievers": "16384", - "dfs.datanode.max.transfer.threads": "16384", - "dfs.datanode.balance.max.concurrent.moves": "1000", - "dfs.datanode.balance.bandwidthPerSec": "100000000" - } - }, - { - "Classification": "yarn-site", - "Properties": { - "yarn.resourcemanager.am.max-attempts": "1", - "yarn.nodemanager.vmem-check-enabled": "false", - "yarn.nodemanager.pmem-check-enabled": "false" - } - }, - { - "Classification": "hadoop-env", - "Configurations": [ - { - "Classification": "export", - "Properties": { - "JAVA_HOME": "/usr/lib/jvm/java-1.8.0", - "GDAL_DATA": "/usr/local/share/gdal", - "LD_LIBRARY_PATH": "/usr/local/lib", - "PYSPARK_PYTHON": "python27", - "PYSPARK_DRIVER_PYTHON": "python27" - } - } - ] - }, - { - "Classification": "spark-env", - "Configurations": [ - { - "Classification": "export", - "Properties": { - "JAVA_HOME": "/usr/lib/jvm/java-1.8.0", - "GDAL_DATA": "/usr/local/share/gdal", - "LD_LIBRARY_PATH": "/usr/local/lib", - "SPARK_PRINT_LAUNCH_COMMAND": "1", - "PYSPARK_PYTHON": "python27", - "PYSPARK_DRIVER_PYTHON": "python27" - } - } - ] - }, - { - "Classification": "yarn-env", - "Configurations": [ - { - "Classification": "export", - "Properties": { - "JAVA_HOME": "/usr/lib/jvm/java-1.8.0", - "GDAL_DATA": "/usr/local/share/gdal", - "LD_LIBRARY_PATH": "/usr/local/lib", - "PYSPARK_PYTHON": "python27", - "PYSPARK_DRIVER_PYTHON": "python27" - } - } - ] - } -]