diff --git a/deployment/batch/makefiles/Makefile b/deployment/batch/makefiles/Makefile index 26236cf4..559e0348 100644 --- a/deployment/batch/makefiles/Makefile +++ b/deployment/batch/makefiles/Makefile @@ -25,6 +25,18 @@ ifndef CLUSTER_ID CLUSTER_ID=$(shell if [ -e "cluster-id.txt" ]; then cat cluster-id.txt; fi) endif +ifndef CORE_EMR_ATTRS +EMR_ATTRS_CORE= +else +EMR_ATTRS_CORE=,${CORE_EMR_ATTRS} +endif + +ifndef MASTER_EMR_ATTRS +EMR_ATTRS_MASTER= +else +EMR_ATTRS_MASTER=,${MASTER_EMR_ATTRS} +endif + rwildcard=$(foreach d,$(wildcard $1*),$(call rwildcard,$d/,$2) $(filter $(subst *,%,$2),$d)) ${INGEST_ASSEMBLY}: $(call rwildcard, ${INGEST_SRC_DIR}/src, *.scala) ${INGEST_SRC_DIR}/build.sbt @@ -45,8 +57,8 @@ create-cluster: --ec2-attributes KeyName=${EC2_KEY},SubnetId=${SUBNET_ID},EmrManagedMasterSecurityGroup=${MASTER_SECURITY_GROUP},EmrManagedSlaveSecurityGroup=${WORKER_SECURITY_GROUP},ServiceAccessSecurityGroup=${SERVICE_ACCESS_SG},AdditionalMasterSecurityGroups=${SANDBOX_SG},AdditionalSlaveSecurityGroups=${SANDBOX_SG} \ --applications Name=Ganglia Name=Hadoop Name=Hue Name=Spark Name=Zeppelin \ --instance-groups \ -'Name=Master,${MASTER_BID_PRICE}InstanceCount=1,InstanceGroupType=MASTER,InstanceType=${MASTER_INSTANCE}' \ -'Name=Workers,${WORKER_BID_PRICE}InstanceCount=${WORKER_COUNT},InstanceGroupType=CORE,InstanceType=${WORKER_INSTANCE}' \ +'Name=Master,${MASTER_BID_PRICE}InstanceCount=1,InstanceGroupType=MASTER,InstanceType=${MASTER_INSTANCE}${EMR_ATTRS_MASTER}' \ +'Name=Workers,${WORKER_BID_PRICE}InstanceCount=${WORKER_COUNT},InstanceGroupType=CORE,InstanceType=${WORKER_INSTANCE}${EMR_ATTRS_CORE}' \ | tee cluster-id.txt upload-ingest: ${INGEST_ASSEMBLY} diff --git a/deployment/batch/makefiles/config-emr.mk b/deployment/batch/makefiles/config-emr.mk index e6ec9d3d..cb8bedfb 100644 --- a/deployment/batch/makefiles/config-emr.mk +++ b/deployment/batch/makefiles/config-emr.mk @@ -6,9 +6,9 @@ export SANDBOX_SG := sg-6b227c23 export MASTER_INSTANCE := m3.xlarge export MASTER_PRICE := 0.10 -export WORKER_INSTANCE := m3.xlarge -export WORKER_PRICE := 0.07 -export WORKER_COUNT := 32 +export WORKER_INSTANCE := r3.xlarge +export WORKER_PRICE := 0.20 +export WORKER_COUNT := 64 export USE_SPOT := true export DRIVER_MEMORY := 10000M @@ -16,3 +16,7 @@ export DRIVER_CORES := 4 export EXECUTOR_MEMORY := 10000M export EXECUTOR_CORES := 8 export YARN_OVERHEAD := 1500 + +# Uncomment/edit the followings line to add extra attributes to the cluster creation +#export MASTER_EMR_ATTRS := +export CORE_EMR_ATTRS := EbsConfiguration={EbsOptimized=true,EbsBlockDeviceConfigs=[{VolumeSpecification={VolumeType=gp2,SizeInGB=1024}}]} diff --git a/deployment/build-container.sh b/deployment/build-container.sh index 379c47b2..aa0417c3 100755 --- a/deployment/build-container.sh +++ b/deployment/build-container.sh @@ -1,5 +1,14 @@ #!/bin/bash +if [ -z ${VERSION_TAG+x} ]; then + echo "No version tag has been set. Do not run this script directly; instead, issue" + echo " make build-container" + echo "from the 'streaming' directory." + exit 1 +else + echo "Version tag is set to '${VERSION_TAG}'" +fi + set -xe SBT_DIR="../src" JAR_DIR=${SBT_DIR}/analytics/target/scala-2.11/ @@ -10,4 +19,5 @@ cd ${SBT_DIR} cp ${JAR_DIR}/osmesa-analytics.jar ${DOCKER_DIR}/osmesa-analytics.jar cd ${DOCKER_DIR} -docker build -f ${DOCKER_DIR}/Dockerfile --tag osm_analytics:latest ${DOCKER_DIR} + +docker build -f ${DOCKER_DIR}/Dockerfile --tag osm_analytics:${VERSION_TAG} ${DOCKER_DIR} diff --git a/deployment/streaming/.gitignore b/deployment/streaming/.gitignore index 8019c7be..32e2785d 100644 --- a/deployment/streaming/.gitignore +++ b/deployment/streaming/.gitignore @@ -1,5 +1 @@ -repository -docker-compose.local.yml -docker-compose.deploy.yml -config-local.mk -config-aws.mk +config-*.mk diff --git a/deployment/streaming/Makefile b/deployment/streaming/Makefile index a3483dda..e443a50f 100644 --- a/deployment/streaming/Makefile +++ b/deployment/streaming/Makefile @@ -1,104 +1,106 @@ -include config-aws.mk # Variables for AWS options -include config-local.mk # Variables related to running locally +include config-deployment.mk -# The osmesa container -LOCAL_IMG := osm_analytics:latest +# If the user is on master branch, see if we should deploy to production +VERSION_TAG=$(shell ./scripts/get-tag.sh) +ifeq ($(VERSION_TAG), production) + DATABASE=${PRODUCTION_DB} + ECS_CLUSTER=${CLUSTER_NAME_DEPLOYMENT} + TASK_SUFFIX= +else + DATABASE=${STAGING_DB} + ECS_CLUSTER=${CLUSTER_NAME_STAGING} + TASK_SUFFIX=-staging +endif +DB_URI=${DB_BASE_URI}/${DATABASE} +.EXPORT_ALL_VARIABLES: -######### -# LOCAL # -######### -docker-compose.local.yml: - export LOCAL_IMG=${LOCAL_IMG}; \ - export AUGDIFF_SOURCE=${LOCAL_AUGDIFF_SOURCE}; \ - export CHANGESET_SOURCE=${LOCAL_CHANGESET_SOURCE}; \ - export CHANGE_SOURCE=${LOCAL_CHANGE_SOURCE}; \ - export AUGDIFF_START=${LOCAL_AUGDIFF_START}; \ - export CHANGESET_START=${LOCAL_CHANGESET_START}; \ - export CHANGE_START=${LOCAL_CHANGE_START}; \ - ./expand.sh docker-compose.local.yml.tpl > docker-compose.local.yml +############################# +# Docker image management # +############################# -start-local: docker-compose.local.yml - docker-compose -f docker-compose.local.yml up +.PHONY: build-container login-aws-registry tag-image push-image -stop-local: - docker-compose -f docker-compose.local.yml down - - -######### -# AWS # -######### +build-container: + cd .. && VERSION_TAG=${VERSION_TAG} ./build-container.sh login-aws-registry: eval `aws ecr get-login --no-include-email --region ${AWS_REGION}` -tag-image: - docker tag ${LOCAL_IMG} ${ECR_REPO} +tag-image: build-container + docker tag osm_analytics:${VERSION_TAG} ${ECR_IMAGE}:${VERSION_TAG} push-image: login-aws-registry tag-image - docker push ${ECR_REPO} - -.PHONY: docker-compose.deploy.yml - -docker-compose.deploy.yml: docker-compose.deploy.yml.tpl - export ECR_REPO=${ECR_REPO} - export AWS_LOG_GROUP=${AWS_LOG_GROUP}; \ - export AWS_REGION=${AWS_REGION}; \ - export AUGDIFF_SOURCE=${AUGDIFF_SOURCE}; \ - export AUGDIFF_START=${AUGDIFF_START}; \ - export CHANGESET_SOURCE=${CHANGESET_SOURCE}; \ - export CHANGESET_START=${CHANGESET_START}; \ - export DB_URI=${DB_URI}; \ - ./expand.sh $< > $@ - -.PHONY: configure-cluster - -configure-cluster: - ecs-cli configure \ - --cluster ${CLUSTER_NAME} \ - --region ${AWS_REGION} \ - --config-name ${CONFIG_NAME} - -cluster-up: - ecs-cli up \ - --keypair ${KEYPAIR} \ - --instance-role ${INSTANCE_ROLE} \ - --security-group ${SECURITY_GROUP} \ - --size 1 \ - --instance-type ${INSTANCE_TYPE} \ - --cluster-config ${CONFIG_NAME} \ - --subnets ${SUBNETS} \ - --vpc ${VPC} \ - --force \ - --verbose - -cluster-down: - ecs-cli down --cluster-config ${CONFIG_NAME} - -.PHONY: create-service - -create-service: docker-compose.deploy.yml configure-cluster - ecs-cli compose \ - --file $< create \ - --cluster ${CLUSTER_NAME} - -start-service: docker-compose.deploy.yml configure-cluster create-service - ecs-cli compose --file $< service up \ - --deployment-min-healthy-percent 0 \ - --create-log-groups \ - --cluster ${CLUSTER_NAME} - -stop-service: docker-compose.deploy.yml - ecs-cli compose --file $< down - - -######### -# ALL # -######### -build-container: - cd .. && ./build-container.sh - -clean: - rm -f docker-compose.local.yml - rm -f docker-compose.deploy.yml - + docker push ${ECR_IMAGE}:${VERSION_TAG} + +####################### +# Streaming AWS Tasks # +####################### + +.PHONY: create-log-groups define-streaming-vectortile-tasks define-staging-streaming-update-tasks define-production-streaming-update-tasks deploy-streaming-footprint-updater deploy-streaming-edit-histogram-updater deploy-streaming-stats-updaters + +create-log-groups: + ./scripts/create-log-groups.sh + +define-streaming-vectortile-tasks: + ./scripts/define-streaming-vectortile-tasks.sh + +define-staging-streaming-update-tasks: + ./scripts/define-staging-streaming-update-tasks.sh + +define-production-streaming-update-tasks: + ./scripts/define-production-streaming-update-tasks.sh + +stop-streaming-footprint-updater: + ./scripts/stop-streaming-service.sh streaming-user-footprint-tile-updater + +deploy-streaming-footprint-updater: stop-streaming-footprint-updater + aws ecs create-service \ + --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ + --service-name "streaming-user-footprint-tile-updater" \ + --task-definition "streaming-edit-histogram-tile-updater" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration ${NETWORK_CONFIGURATION} + +stop-streaming-edit-histogram-updater: + ./scripts/stop-streaming-service.sh streaming-edit-histogram-tile-updater + +deploy-streaming-edit-histogram-updater: stop-streaming-edit-histogram-updater + aws ecs create-service \ + --cluster "${CLUSTER_NAME_DEPLOYMENT}" \ + --service-name "streaming-edit-histogram-tile-updater" \ + --task-definition "streaming-edit-histogram-tile-updater" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration ${NETWORK_CONFIGURATION} + +stop-streaming-stats-updaters: + ./scripts/stop-streaming-service.sh streaming-stats-updater + +deploy-streaming-stats-updaters: stop-streaming-stats-updaters + aws ecs create-service \ + --cluster "${ECS_CLUSTER}" \ + --service-name "streaming-stats-updater" \ + --task-definition "streaming-stats-updater${TASK_SUFFIX}" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration ${NETWORK_CONFIGURATION} + +deploy-streaming-vectortile-tasks: deploy-footprint-updater deploy-streaming-edit-histogram-updater + +################### +# Batch AWS Tasks # +################### + +batch-generate-footprints: + ./scripts/batch-generate-footprints.sh + +batch-generate-edit-histograms: + ./scripts/batch-generate-edit-histograms.sh + +batch-generate-db-backfill: + ./scripts/batch-process.sh "OSMesa Batch Process" "ChangesetStatsCreator" 64 "[\"spark-submit\", \"--deploy-mode\", \"cluster\", \"--class\", \"osmesa.analytics.oneoffs.ChangesetStatsCreator\", \"--conf\", \"spark.executor.memoryOverhead=2g\", \"--conf\", \"spark.sql.shuffle.partitions=2000\", \"--conf\", \"spark.speculation=true\", \"${OSMESA_ANALYTICS_JAR}\", \"--history\", \"${HISTORY_ORC}\", \"--changesets\", \"${CHANGESETS_ORC}\", \"--changeset-stream\", \"${CHANGESET_SOURCE}\", \"--database-url\", \"${DB_URI}\"]" diff --git a/deployment/streaming/config-aws.mk.example b/deployment/streaming/config-aws.mk.example deleted file mode 100644 index 03e55711..00000000 --- a/deployment/streaming/config-aws.mk.example +++ /dev/null @@ -1,25 +0,0 @@ -export CONFIG_NAME := osm-stat-stream-config - -# AWS properties -export CLUSTER_NAME := osm-stat-stream-cluster -export INSTANCE_TYPE := m4.xlarge -export KEYPAIR := [AWS key pair] -export VPC := [VPC ID] -export SUBNETS := [comma-delimited list of subnets within the above VPC] -export SECURITY_GROUP := [comma-delimited list of AWS Security Group IDs] -export ECR_REPO := [AWS ECR repo URI] -export AWS_LOG_GROUP := osm-stats-stream -export AWS_REGION := us-east-1 -export INSTANCE_ROLE := [IAM instance role] - -export AUGDIFF_SOURCE := s3://path/to/augdiffs/ -export CHANGESET_SOURCE := https://planet.osm.org/replication/changesets/ -export AUGDIFF_START := [Start of Augdiff stream (for stats)] -export CHANGE_START := [Start of change stream (for user footprints)] -export CHANGESET_START := [Start of changeset stream (for stats)] -export OVERPASS_URL := [alternative Overpass URL] -export TILE_SOURCE := s3://path/to/user/footprints/ - - -export DB_URI := [URI to DB for writing outputs from stream] - diff --git a/deployment/streaming/config-deployment.mk.template b/deployment/streaming/config-deployment.mk.template new file mode 100644 index 00000000..cf7ba4cc --- /dev/null +++ b/deployment/streaming/config-deployment.mk.template @@ -0,0 +1,42 @@ +################################################################################ +# AWS properties +################################################################################ +export KEYPAIR := +export SUBNET := +export AWS_REGION := us-east-1 +export IAM_ACCOUNT := + +################################################################################ +# Streaming resource definitions +################################################################################ +export CLUSTER_NAME := osm-stat-stream-cluster +export STREAMING_INSTANCE_TYPE := m4.xlarge +export ECR_IMAGE := +export AWS_LOG_GROUP := streaming-stats-updater +export ECS_SUBNET := ${SUBNET} +export ECS_SECURITY_GROUP := + +export AUGDIFF_SOURCE := +export CHANGESET_SOURCE := + +export DB_BASE_URI := +export PRODUCTION_DB := +export STAGING_DB := + +export NETWORK_CONFIGURATION="{\"awsvpcConfiguration\": {\"subnets\": [\"${ECS_SUBNET}\"], \"securityGroups\": [\"${ECS_SECURITY_GROUP}\"], \"assignPublicIp\": \"DISABLED\"}}" + +################################################################################ +# Batch resource definitions +################################################################################ +export SERVICE_ACCESS_SECURITY_GROUP := ${ECS_SECURITY_GROUP} +export EMR_MASTER_SECURITY_GROUP := +export EMR_SLAVE_SECURITY_GROUP := + +export BATCH_INSTANCE_TYPE := m4.xlarge +export OSMESA_ANALYTICS_JAR := s3:///osmesa-analytics.jar + +export HISTORY_ORC := +export CHANGESETS_ORC := + +export FOOTPRINT_VT_LOCATION := +export HISTOGRAM_VT_LOCATION := diff --git a/deployment/streaming/config-local.mk.example b/deployment/streaming/config-local.mk.example deleted file mode 100644 index 02a525e9..00000000 --- a/deployment/streaming/config-local.mk.example +++ /dev/null @@ -1,7 +0,0 @@ -export LOCAL_AUGDIFF_SOURCE := s3://path/to/augdiffs/ -export LOCAL_CHANGE_SOURCE := https://planet.osm.org/replication/minute/ -export LOCAL_CHANGESET_SOURCE := https://planet.osm.org/replication/changesets/ -export LOCAL_AUGDIFF_START := [Start of augdiff stream] -export LOCAL_CHANGE_START := [Start of change stream] # see https://planet.osm.org/replication/minute/state.txt -export LOCAL_CHANGESET_START := [Start of changeset stream] # see https://planet.osm.org/replication/changesets/state.yaml - diff --git a/deployment/streaming/docker-compose.deploy.yml.tpl b/deployment/streaming/docker-compose.deploy.yml.tpl deleted file mode 100644 index 2cb6e6ad..00000000 --- a/deployment/streaming/docker-compose.deploy.yml.tpl +++ /dev/null @@ -1,42 +0,0 @@ -version: '3.0' -services: - augdiff-stream: - image: ${ECR_REPO}:latest - command: > - /spark/bin/spark-submit --driver-memory 2048m --class osmesa.analytics.oneoffs.StreamingChangesetStatsUpdater /opt/osmesa-analytics.jar - --augmented-diff-source ${AUGDIFF_SOURCE} - --start-sequence ${AUGDIFF_START} - --database-uri ${DB_URI} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: augdiff - changeset-stream: - image: ${ECR_REPO}:latest - command: > - /spark/bin/spark-submit --driver-memory 2048m --class osmesa.analytics.oneoffs.StreamingChangesetMetadataUpdater /opt/osmesa-analytics.jar - --changeset-source ${CHANGESET_SOURCE} - --start-sequence ${CHANGESET_START} - --database-uri ${DB_URI} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: changeset - user-footprint-updater: - image: ${ECR_REPO}:latest - command: > - /spark/bin/spark-submit --driver-memory 4096m --class osmesa.analytics.oneoffs.StreamingUserFootprintTileUpdater /opt/osmesa-analytics.jar - --change-source ${CHANGE_SOURCE} - --changes-start-sequence ${CHANGE_START} - --database-uri ${DB_URI} - --tile-source ${TILE_SOURCE} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: user-footprints diff --git a/deployment/streaming/docker-compose.local.yml.tpl b/deployment/streaming/docker-compose.local.yml.tpl deleted file mode 100644 index 0ea66c72..00000000 --- a/deployment/streaming/docker-compose.local.yml.tpl +++ /dev/null @@ -1,51 +0,0 @@ -version: '3.0' -services: - db: - image: postgres:10.5 - volumes: - - ../sql:/docker-entrypoint-initdb.d - environment: - - POSTGRES_PASSWORD=pgsecret - networks: - db: - aliases: - - database - changeset-stream: - image: ${LOCAL_IMG} - volumes: - - ./log4j.properties:/spark/conf/log4j.properties - command: > - /spark/bin/spark-submit --class osmesa.analytics.oneoffs.ChangesetStreamProcessor /opt/osmesa-analytics.jar - --changeset-source ${CHANGESET_SOURCE} - --start-sequence ${CHANGESET_START} - --database-uri postgresql://postgres:pgsecret@database:5432/postgres - networks: - - db - augdiff-stream: - image: ${LOCAL_IMG} - volumes: - - ~/.aws:/root/.aws - - ./log4j.properties:/spark/conf/log4j.properties - environment: - - AWS_PROFILE - command: > - /spark/bin/spark-submit --class osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor /opt/osmesa-analytics.jar - --augmented-diff-source ${AUGDIFF_SOURCE} - --start-sequence ${AUGDIFF_START} - --database-uri postgresql://postgres:pgsecret@database:5432/postgres - networks: - - db - change-stream: - image: ${LOCAL_IMG} - volumes: - - ./log4j.properties:/spark/conf/log4j.properties - command: > - /spark/bin/spark-submit --class osmesa.analytics.oneoffs.ChangeStreamProcessor /opt/osmesa-analytics.jar - --change-source ${CHANGE_SOURCE} - --start-sequence ${CHANGE_START} - --database-uri postgresql://postgres:pgsecret@database:5432/postgres - networks: - - db -networks: - db: - diff --git a/deployment/streaming/docker-compose.yml b/deployment/streaming/docker-compose.yml deleted file mode 100644 index dbf0a149..00000000 --- a/deployment/streaming/docker-compose.yml +++ /dev/null @@ -1,47 +0,0 @@ -version: '3.7' -services: - db: - image: postgres:10.5 - volumes: - - ../sql:/docker-entrypoint-initdb.d - environment: - - POSTGRES_PASSWORD=streamtest - networks: - db: - aliases: - - database - augdiff-stream: - image: osm_analytics:latest - command: > - /spark/bin/spark-submit - --class osmesa.analytics.oneoffs.StreamingChangesetStatsUpdater - /opt/osmesa-analytics.jar - --augmented-diff-source ${AUGDIFF_SOURCE} - --database-url postgresql://postgres:streamtest@database/postgres - --start-sequence 1 - deploy: - restart_policy: - condition: on-failure - delay: 1s - max_attempts: 10 - window: 120s - networks: - - db - changeset-stream: - image: osm_analytics:latest - command: > - /spark/bin/spark-submit - --class osmesa.analytics.oneoffs.StreamingChangesetMetadataUpdater - /opt/osmesa-analytics.jar - --start-sequence 1 - --database-url postgresql://postgres:streamtest@database/postgres - deploy: - restart_policy: - condition: on-failure - delay: 1s - max_attempts: 10 - window: 120s - networks: - - db -networks: - db: diff --git a/deployment/streaming/scripts/batch-generate-edit-histograms.sh b/deployment/streaming/scripts/batch-generate-edit-histograms.sh new file mode 100644 index 00000000..934e63d7 --- /dev/null +++ b/deployment/streaming/scripts/batch-generate-edit-histograms.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --ebs-root-volume-size 10 \ + --ec2-attributes '{ + "KeyName": "${KEYPAIR}", + "InstanceProfile":"EMR_EC2_DefaultRole", + "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", + "SubnetId": "${SUBNET}", + "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", + "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" + }' \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name 'Faceted State of the Data tile generation' \ + --instance-groups '[ + { + "InstanceCount": 1, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "MASTER", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Master" + }, { + "InstanceCount": 20, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "CORE", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Workers" + } + ]' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region us-east-1 \ + --steps '[ + { + "Args": [ + "spark-submit", + "--deploy-mode", "cluster", + "--class", "osmesa.analytics.oneoffs.FacetedEditHistogramTileCreator", + "--conf", "spark.executor.memoryOverhead=2g", + "--conf", "spark.sql.shuffle.partitions=2000", + "--conf", "spark.speculation=true", + "${OSMESA_ANALYTICS_JAR}", + "--history", "${HISTORY_ORC}", + "--out", "${HISTOGRAM_VT_LOCATION}" + ], + "Type": "CUSTOM_JAR", + "ActionOnFailure": "TERMINATE_CLUSTER", + "Jar": "command-runner.jar", + "Properties": "", + "Name": "FacetedEditHistogramTileCreator" + } + ]' diff --git a/deployment/streaming/scripts/batch-generate-footprints.sh b/deployment/streaming/scripts/batch-generate-footprints.sh new file mode 100644 index 00000000..fa09aa98 --- /dev/null +++ b/deployment/streaming/scripts/batch-generate-footprints.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --ebs-root-volume-size 10 \ + --ec2-attributes '{ + "KeyName": "${KEYPAIR}", + "InstanceProfile":"EMR_EC2_DefaultRole", + "ServiceAccessSecurityGroup": "${SERVICE_ACCESS_SECURITY_GROUP}", + "SubnetId": "${SUBNET}", + "EmrManagedSlaveSecurityGroup": "${EMR_SLAVE_SECURITY_GROUP}", + "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SECURITY_GROUP}" + }' \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name 'User footprint tile generation' \ + --instance-groups '[ + { + "InstanceCount": 1, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "MASTER", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Master" + }, { + "InstanceCount": 20, + "BidPrice": "OnDemandPrice", + "InstanceGroupType": "CORE", + "InstanceType": "${BATCH_INSTANCE_TYPE}", + "Name":"Workers" + } + ]' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region ${AWS_REGION} \ + --steps '[ + { + "Args": [ + "spark-submit", + "--deploy-mode", "cluster", + "--class", "osmesa.analytics.oneoffs.FootprintCommand", + "--conf", "spark.executor.memoryOverhead=2g", + "--conf", "spark.sql.shuffle.partitions=2000", + "--conf", "spark.speculation=true", + "${OSMESA_ANALYTICS_JAR}", + "--history", "${HISTORY_ORC}", + "--changesets", "${CHANGESETS_ORC}", + "--out", "${FOOTPRINT_VT_LOCATION}", + "--type", "users", + ], + "Type": "CUSTOM_JAR", + "ActionOnFailure": "TERMINATE_CLUSTER", + "Jar": "command-runner.jar", + "Properties": "", + "Name": "FootprintCommand" + } + ]' diff --git a/deployment/streaming/scripts/batch-process.sh b/deployment/streaming/scripts/batch-process.sh new file mode 100755 index 00000000..cb45ee2e --- /dev/null +++ b/deployment/streaming/scripts/batch-process.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +CLUSTER_NAME=$1 +STEP_NAME=$2 +NUM_EXECUTORS=$3 +ARGS=$4 + +set -x +aws emr create-cluster \ + --applications Name=Ganglia Name=Spark \ + --log-uri ${S3_LOG_URI} \ + --ebs-root-volume-size 10 \ + --ec2-attributes "{ + \"KeyName\": \"${KEYPAIR}\", + \"InstanceProfile\":\"EMR_EC2_DefaultRole\", + \"SubnetId\": \"${SUBNET}\", + \"EmrManagedMasterSecurityGroup\": \"${MASTER_SECURITY_GROUP}\", + \"EmrManagedSlaveSecurityGroup\": \"${WORKER_SECURITY_GROUP}\", + \"ServiceAccessSecurityGroup\": \"${SERVICE_ACCESS_SG}\", + \"AdditionalMasterSecurityGroups\": [\"${SANDBOX_SG}\"], + \"AdditionalSlaveSecurityGroups\": [\"${SANDBOX_SG}\"] + }" \ + --service-role EMR_DefaultRole \ + --release-label emr-5.19.0 \ + --name "$CLUSTER_NAME" \ + --instance-groups "[ + { + \"InstanceCount\": 1, + \"BidPrice\": \"OnDemandPrice\", + \"InstanceGroupType\": \"MASTER\", + \"InstanceType\": \"${BATCH_INSTANCE_TYPE}\", + \"Name\":\"Master\" + }, { + \"InstanceCount\": ${NUM_EXECUTORS}, + \"BidPrice\": \"OnDemandPrice\", + \"InstanceGroupType\": \"CORE\", + \"InstanceType\": \"${BATCH_INSTANCE_TYPE}\", + \"Name\":\"Workers\" + } + ]" \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --auto-terminate \ + --region us-east-1 \ + --steps "[ + { + \"Args\": $ARGS, + \"Type\": \"CUSTOM_JAR\", + \"ActionOnFailure\": \"TERMINATE_CLUSTER\", + \"Jar\": \"command-runner.jar\", + \"Properties\": \"\", + \"Name\": \"$STEP_NAME\" + } + ]" diff --git a/deployment/streaming/scripts/create-log-groups.sh b/deployment/streaming/scripts/create-log-groups.sh new file mode 100755 index 00000000..459a92cc --- /dev/null +++ b/deployment/streaming/scripts/create-log-groups.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +DEFINED_GROUPS=$(aws logs describe-log-groups | jq '.logGroups[].logGroupName' | sed -e 's/"//g') + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP} +fi + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}-staging"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP}-staging +fi + +if [[ $DEFINED_GROUPS != *"/ecs/streaming-user-footprint-tile-updater"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/streaming-user-footprint-tile-updater +fi + +if [[ $DEFINED_GROUPS != *"/ecs/streaming-edit-histogram-tile-updater"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/streaming-edit-histogram-tile-updater +fi diff --git a/deployment/streaming/scripts/define-production-streaming-update-tasks.sh b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh new file mode 100755 index 00000000..0d265591 --- /dev/null +++ b/deployment/streaming/scripts/define-production-streaming-update-tasks.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family streaming-stats-updater-production \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor\", + \"/opt/osmesa-analytics.jar\", + \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"streaming-augmented-diffs-stats-updater\" + }, + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.ChangesetStreamProcessor\", + \"/opt/osmesa-analytics.jar\", + \"--changeset-source\", \"${CHANGESET_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"streaming-changesets-stats-updater\" + } + ]" diff --git a/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh new file mode 100755 index 00000000..85e7fe65 --- /dev/null +++ b/deployment/streaming/scripts/define-staging-streaming-update-tasks.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family streaming-stats-updater-staging \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingChangesetStatsUpdater\", + \"/opt/osmesa-analytics.jar\", + \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"streaming-augmented-diffs-stats-updater-staging\" + }, + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingChangesetMetadataUpdater\", + \"/opt/osmesa-analytics.jar\", + \"--changeset-source\", \"${CHANGESET_SOURCE}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"streaming-changesets-stats-updater-staging\" + } + ]" diff --git a/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh new file mode 100755 index 00000000..b2d2b36c --- /dev/null +++ b/deployment/streaming/scripts/define-streaming-vectortile-tasks.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family streaming-edit-histogram-tile-updater \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "2 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/streaming-edit-histogram-tile-updater\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingFacetedEditHistogramTileUpdater\", + \"/opt/osmesa-analytics.jar\", + \"--augmented-diff-source\", \"${AUGDIFF_SOURCE}\", + \"--tile-source\", \"${HISTOGRAM_VT_LOCATION}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}\", + \"name\": \"streaming-edit-histogram-tile-updater\" + }, + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/streaming-user-footprint-tile-updater\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"/spark/bin/spark-submit\", + \"--driver-memory\", \"2048m\", + \"--class\", \"osmesa.analytics.oneoffs.StreamingUserFootprintTileUpdater\", + \"/opt/osmesa-analytics.jar\", + \"--change-source\", \"${CHANGESET_SOURCE}\", + \"--tile-source\", \"${FOOTPRINT_VT_LOCATION}\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"streaming-user-footprint-tile-updater\" + } + ]" diff --git a/deployment/streaming/expand.sh b/deployment/streaming/scripts/expand.sh similarity index 100% rename from deployment/streaming/expand.sh rename to deployment/streaming/scripts/expand.sh diff --git a/deployment/streaming/scripts/get-tag.sh b/deployment/streaming/scripts/get-tag.sh new file mode 100755 index 00000000..388bc0dd --- /dev/null +++ b/deployment/streaming/scripts/get-tag.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ "$(git branch | grep '* master')" = "* master" ]; then + while true; do + echo "You are on the master branch. Do you wish to publish to the production tag?" + select yn in "Yes" "No"; do + case $yn in + Yes ) VERSION_TAG="production"; break;; + No ) VERSION_TAG="latest"; break;; + esac + done + done +else + VERSION_TAG="latest" +fi + +echo "${VERSION_TAG}" diff --git a/deployment/streaming/scripts/stop-streaming-service.sh b/deployment/streaming/scripts/stop-streaming-service.sh new file mode 100755 index 00000000..2eee9af7 --- /dev/null +++ b/deployment/streaming/scripts/stop-streaming-service.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +SERVICE=$1 +echo "Attempting to stop $SERVICE on cluster $ECS_CLUSTER" + +check_status() { + STATUS=$(aws ecs describe-services --services $SERVICE --cluster $ECS_CLUSTER | jq '.services[].status') +} + +check_status +if [[ $STATUS == "\"ACTIVE\"" ]]; then + aws ecs delete-service --service $SERVICE --cluster $ECS_CLUSTER --force + echo "Waiting for shut down" + check_status + while [[ $STATUS != "\"INACTIVE\"" ]]; do + echo " current status: $STATUS, still waiting" + sleep 15s + check_status + done + echo " final status: $STATUS" +else + echo "Status was $STATUS, nothing to stop" +fi diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala index 827af724..f316a863 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetMetadataUpdater.scala @@ -80,7 +80,8 @@ object ChangesetMetadataUpdater import ss.implicits._ val options = Map( - Source.BaseURI -> changesetSource.toString + Source.BaseURI -> changesetSource.toString, + Source.ProcessName -> "ChangesetMetadataUpdater" ) ++ startSequence .map(s => Map(Source.StartSequence -> s.toString)) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala index e7d0af63..2ee9a19c 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsCreator.scala @@ -1,6 +1,7 @@ package osmesa.analytics.oneoffs import java.net.URI +import java.sql._ import cats.implicits._ import com.monovore.decline.{CommandApp, Opts} @@ -13,7 +14,8 @@ import osmesa.analytics.stats.functions._ import vectorpipe.{internal => ProcessOSM} import vectorpipe.functions._ import vectorpipe.functions.osm._ -import vectorpipe.util.Geocode +import vectorpipe.sources.{AugmentedDiffSource, ChangesetSource} +import vectorpipe.util.{DBUtils, Geocode} object ChangesetStatsCreator extends CommandApp( @@ -27,6 +29,16 @@ object ChangesetStatsCreator Opts .option[String]("changesets", help = "Location of the Changesets ORC file to process.") + val changesetBaseOpt = + Opts + .option[URI]( + "changeset-stream", + short = "c", + metavar = "uri", + help = "HTTP Location of replication changesets" + ) + .validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") } + val databaseUrlOpt = Opts .option[URI]( @@ -37,12 +49,20 @@ object ChangesetStatsCreator ) .orElse(Opts.env[URI]("DATABASE_URL", help = "The URL of the database")) - (historyOpt, changesetsOpt, databaseUrlOpt).mapN { - (historySource, changesetSource, databaseUrl) => + (historyOpt, changesetsOpt, changesetBaseOpt, databaseUrlOpt).mapN { + (historySource, changesetSource, changesetBaseURI, databaseUrl) => implicit val spark: SparkSession = Analytics.sparkSession("ChangesetStats") import spark.implicits._ + val logger = org.apache.log4j.Logger.getLogger(getClass()) + val history = spark.read.orc(historySource) + + val augdiffEndSequence = { + val t = history.select(max('timestamp)).first.getAs[java.sql.Timestamp](0) + AugmentedDiffSource.timestampToSequence(t) + } + val nodes = ProcessOSM.preprocessNodes(history) val ways = ProcessOSM.preprocessWays(history) @@ -105,23 +125,27 @@ object ChangesetStatsCreator ) val changesets = spark.read.orc(changesetSource) + val changesetsEndSequence = { + val t = changesets.select(max(coalesce('createdAt, 'closedAt))).first.getAs[java.sql.Timestamp](0) + ChangesetSource.findSequenceFor(t.toInstant, changesetBaseURI).toInt + } val changesetMetadata = changesets .groupBy('id, 'tags.getItem("created_by") as 'editor, 'uid, 'user, - 'created_at, + 'createdAt, 'tags.getItem("comment") as 'comment, - 'tags.getItem("hashtags") as 'hashtag) - .agg(first('closed_at, ignoreNulls = true) as 'closed_at) + 'tags.getItem("hashtags") as 'hashtags) + .agg(first('closedAt, ignoreNulls = true) as 'closedAt) .select( 'id, 'editor, 'uid, 'user, - 'created_at as 'createdAt, - 'closed_at as 'closedAt, + 'createdAt, + 'closedAt, merge_sets(hashtags('comment), hashtags('hashtags)) as 'hashtags ) @@ -155,6 +179,16 @@ object ChangesetStatsCreator } }) + // Distributing these writes to the executors to avoid no suitable driver errors on master node + logger.warn(s"Writing augmented diff sequence number as $augdiffEndSequence to $databaseUrl") + spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => + MergeChangesetUtils.saveLocations("ChangesetStatsUpdater", augdiffEndSequence, uri) + } + logger.warn(s"Writing changeset stream sequence number as $changesetsEndSequence to $databaseUrl") + spark.sparkContext.parallelize(Seq(databaseUrl)).foreach { uri => + MergeChangesetUtils.saveLocations("ChangesetMetadataUpdater", changesetsEndSequence, uri) + } + spark.stop() } } diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala index 81d2b5d6..7ba7ecdb 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/ChangesetStatsUpdater.scala @@ -91,7 +91,8 @@ object ChangesetStatsUpdater import ss.implicits._ val options = Map( - Source.BaseURI -> augmentedDiffSource.toString + Source.BaseURI -> augmentedDiffSource.toString, + Source.ProcessName -> "ChangesetStatsUpdater" ) ++ startSequence .map(s => Map(Source.StartSequence -> s.toString)) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala new file mode 100644 index 00000000..06d7dfde --- /dev/null +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergeChangesets.scala @@ -0,0 +1,163 @@ +package osmesa.analytics.oneoffs + +import cats.data.{Validated, ValidatedNel} +import cats.implicits._ +import com.monovore.decline._ +import io.circe.generic.auto._ +import io.circe.{yaml, _} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import osmesa.analytics.Analytics +import vectorpipe.sources.{ChangesetSource, Source} +import vectorpipe.util.DBUtils + +import java.net.URI +import java.sql._ +import java.time.Instant +import scalaj.http.Http + +/* + * Usage example: + * + * sbt "project analytics" assembly + * + * spark-submit \ + * --class osmesa.analytics.oneoffs.MergeChangesets \ + * ingest/target/scala-2.11/osmesa-analytics.jar \ + * --changesets http://location/of/changeset/replications \ + * --end-time 1970-01-01T13:00:00Z + * s3://path/to/history.orc + * s3://path/to/output.orc + */ +object MergeChangesets + extends CommandApp( + name = "osmesa-merge-changesets", + header = "Bring existing changeset ORC file up to date using changeset stream", + main = { + + import MergeChangesetUtils._ + import ChangesetSource._ + + val changesetSourceOpt = + Opts + .option[URI]( + "changesets", + short = "c", + metavar = "uri", + help = "Location of replication changesets" + ) + .validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") } + + val endTimeOpt = + Opts + .option[Instant]("end-time", + short = "e", + metavar = "timestamp", + help = "Timestamp of stream end (of the form 2016-02-29T13:45:00Z); if absent, the time now will be used") + .orNone + + val orcArg = Opts + .argument[URI]("source ORC") + .validate("URI to ORC must have an s3 or file scheme") { _.getScheme != null } + .validate("orc must be an S3 or file Uri") { uri => + uri.getScheme.startsWith("s3") || uri.getScheme.startsWith("file") + } + .validate("orc must be an .orc file") { _.getPath.endsWith(".orc") } + + val outputArg = Opts.argument[URI]("destination ORC") + .validate("Output URI must have a scheme") { _.getScheme != null } + .validate("Output URI must have an S3 or file scheme") { uri => + uri.getScheme.startsWith("s3") || uri.getScheme.startsWith("file") + } + .validate("orc must be an .orc file") { _.getPath.endsWith(".orc") } + + (changesetSourceOpt, + endTimeOpt, + orcArg, + outputArg).mapN { + (changesetSource, endTime, orcUri, outputURI) => + implicit val spark: SparkSession = Analytics.sparkSession("MergeChangesets") + + import spark.implicits._ + + val df = spark.read.orc(orcUri.toString) + val lastModified = df.select(max(coalesce('closed_at, 'created_at))).first.getAs[Timestamp](0) + + val startSequence = findSequenceFor(lastModified.toInstant, changesetSource) + val endSequence = endTime.map(findSequenceFor(_, changesetSource)).getOrElse(getCurrentSequence(changesetSource).get.sequence) + + val options = Map( + Source.BaseURI -> changesetSource.toString, + Source.StartSequence -> startSequence.toString, + Source.EndSequence -> (endSequence + 1).toString // sequence range is (]; end sequence is exclusive + ) + + val changesets = spark.read.format(Source.Changesets).options(options).load + + // TODO: Clean up the following by providing and using a function in VP to coerce the + // column names into camel case (see https://github.com/geotrellis/vectorpipe/issues/113) + changesets + .drop("comments", "sequence") + .union(df.select( + 'id, + 'tags, + 'created_at as 'createdAt, + 'open, + 'closed_at as 'closedAt, + 'comments_count as 'commentsCount, + 'min_lat as 'minLat, + 'max_lat as 'maxLat, + 'min_lon as 'minLon, + 'max_lon as 'maxLon, + 'num_changes as 'numChanges, + 'uid, + 'user) + ) + .repartition(1) + .write + .orc(outputURI.toString) + + spark.stop() + } + } +) + +object MergeChangesetUtils { + implicit val readInstant: Argument[Instant] = new Argument[Instant] { + override def read(string: String): ValidatedNel[String, Instant] = { + try { Validated.valid(Instant.parse(string)) } + catch { case e: Exception => Validated.invalidNel(s"Invalid time: $string (${ e.getMessage })") } + } + + override def defaultMetavar: String = "time" + } + + private val formatter = DateTimeFormat.forPattern("y-M-d H:m:s.SSSSSSSSS Z") + + private implicit val dateTimeDecoder: Decoder[DateTime] = + Decoder.instance(a => a.as[String].map(DateTime.parse(_, formatter))) + + def saveLocations(procName: String, sequence: Int, databaseURI: URI) = { + var connection: Connection = null + try { + connection = DBUtils.getJdbcConnection(databaseURI) + val upsertSequence = + connection.prepareStatement( + """ + |INSERT INTO checkpoints (proc_name, sequence) + |VALUES (?, ?) + |ON CONFLICT (proc_name) + |DO UPDATE SET sequence = ? + """.stripMargin + ) + upsertSequence.setString(1, procName) + upsertSequence.setInt(2, sequence) + upsertSequence.setInt(3, sequence) + upsertSequence.execute() + } finally { + if (connection != null) connection.close() + } + } +} diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala index 93380f2a..4f9766a7 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/MergedChangesetStreamProcessor.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.functions._ import vectorpipe.functions._ import vectorpipe.functions.osm._ import vectorpipe.model.ElementWithSequence -import vectorpipe.sources.Source +import vectorpipe.sources.{AugmentedDiffSource, Source} /* * Usage example: @@ -160,10 +160,7 @@ object MergedChangesetStreamProcessor ) val geomsWithWatermark = geoms - .withColumn( - "timestamp", - to_timestamp('sequence * 60 + 1347432900) - ) + .withColumn("timestamp", AugmentedDiffSource.sequenceToTimestamp('sequence)) // geoms are standalone; no need to wait for anything .withWatermark("timestamp", "0 seconds") .select('timestamp, 'changeset, '_type, 'id, 'version, 'minorVersion, 'updated) diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala index b145490d..91628d3c 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetMetadataUpdater.scala @@ -24,8 +24,8 @@ import osmesa.analytics.stats.ChangesetMetadataForeachWriter */ object StreamingChangesetMetadataUpdater extends CommandApp( - name = "osmesa-augmented-diff-stream-processor", - header = "Update statistics from streaming augmented diffs", + name = "osmesa-changeset-stream-processor", + header = "Update statistics from changeset replication stream", main = { val changesetSourceOpt = Opts diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala index c9f9291f..f653d3b9 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingChangesetStatsUpdater.scala @@ -14,7 +14,7 @@ import vectorpipe.{internal => ProcessOSM} import vectorpipe.functions._ import vectorpipe.functions.osm._ import vectorpipe.model.ElementWithSequence -import vectorpipe.sources.Source +import vectorpipe.sources.{AugmentedDiffSource, Source} import vectorpipe.util.{DBUtils, Geocode} /* @@ -108,7 +108,7 @@ object StreamingChangesetStatsUpdater // in practice, this means that aggregation doesn't occur until the *next* sequence is received val query = Geocode(geoms.where(isTagged('tags))) - .withColumn("timestamp", to_timestamp('sequence * 60 + 1347432900)) + .withColumn("timestamp", AugmentedDiffSource.sequenceToTimestamp('sequence)) // if sequences are received sequentially (and atomically), 0 seconds should suffice; anything received with an // earlier timestamp after that point will be dropped .withWatermark("timestamp", "0 seconds") diff --git a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala index 41420ea2..355a3c6e 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/oneoffs/StreamingFacetedEditHistogramTileUpdater.scala @@ -13,7 +13,7 @@ import osmesa.analytics.{Analytics, EditHistogram} import vectorpipe.{internal => ProcessOSM} import vectorpipe.internal.{NodeType, WayType} import vectorpipe.functions.osm._ -import vectorpipe.sources.Source +import vectorpipe.sources.{AugmentedDiffSource, Source} object StreamingFacetedEditHistogramTileUpdater extends CommandApp( @@ -117,7 +117,7 @@ object StreamingFacetedEditHistogramTileUpdater .options(options) .load // convert sequence into timestamp - .withColumn("watermark", to_timestamp(from_unixtime('sequence * 60 + 1347432900))) + .withColumn("watermark", AugmentedDiffSource.sequenceToTimestamp('sequence)) .withWatermark("watermark", "0 seconds") val nodes = diffs diff --git a/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala b/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala index 28f5a0d9..e263e60e 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/stats/ChangesetStatsForeachWriter.scala @@ -23,7 +23,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI, | ?::jsonb AS measurements, | ?::jsonb AS counts, | ? AS total_edits, - | ? AS augmented_diffs, + | ?::integer[] AS augmented_diffs, | current_timestamp AS updated_at |) |INSERT INTO changesets AS c ( @@ -41,7 +41,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI, | measurements = ( | SELECT jsonb_object_agg(key, value) | FROM ( - | SELECT key, sum(value::numeric) AS value + | SELECT key, sum((value->>0)::numeric) AS value | FROM ( | SELECT * from jsonb_each(c.measurements) | UNION ALL @@ -54,7 +54,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI, | counts = ( | SELECT jsonb_object_agg(key, value) | FROM ( - | SELECT key, sum(value::numeric) AS value + | SELECT key, sum((value->>0)::numeric) AS value | FROM ( | SELECT * from jsonb_each(c.counts) | UNION ALL diff --git a/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala b/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala index 7a07552a..2fa082ce 100644 --- a/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala +++ b/src/analytics/src/main/scala/osmesa/analytics/stats/package.scala @@ -46,13 +46,27 @@ package object stats { isCoastline(tags) or isPOI(tags) as 'isInterestingWay - def isLinear(tags: Column): Column = isRoad(tags) or isWaterway(tags) or isCoastline(tags) as 'isLinear + // Does this feature represent a rail-related site or area (not track) + def isRailFeature(tags: Column): Column = + array_contains(splitDelimitedValues(tags.getItem("railway")), "station") or + array_contains(splitDelimitedValues(tags.getItem("railway")), "yard") or + array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailSite + + // Does this feature represent a section of rail track + def isRailLine(tags: Column): Column = not(isRailFeature(tags)) and tags.getItem("railway").isNotNull as 'isRailLine + + // Does this feature represent a rail-related entity + def isRailway(tags: Column): Column = + tags.getItem("railway").isNotNull or array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailway + + def isLinear(tags: Column): Column = isRoad(tags) or isWaterway(tags) or isCoastline(tags) or isRailLine(tags) as 'isLinear def isOther(tags: Column): Column = isTagged(tags) and not(isRoad((tags))) and not(isWaterway(tags)) and not(isCoastline(tags)) and not(isBuilding(tags)) and + not(isRailway(tags)) and not(isPOI(tags)) as 'isOther def DefaultMeasurements(implicit sparkSession: SparkSession): Column = { @@ -67,7 +81,10 @@ package object stats { lit("waterway_km_deleted"), (isWaterway('tags) and !'visible).cast(IntegerType) * 'delta / 1000, lit("coastline_km_added"), (isCoastline('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, lit("coastline_km_modified"), (isCoastline('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, - lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000 + lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000, + lit("railline_km_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000, + lit("railline_km_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000, + lit("railline_km_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType) * 'delta / 1000 )) as 'measurements } @@ -87,6 +104,12 @@ package object stats { lit("buildings_added"), (isBuilding('tags) and isNew('version, 'minorVersion)).cast(IntegerType), lit("buildings_modified"), (isBuilding('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), lit("buildings_deleted"), (isBuilding('tags) and !'visible).cast(IntegerType), + lit("railway_features_added"), (isRailFeature('tags) and isNew('version, 'minorVersion)).cast(IntegerType), + lit("railway_features_modified"), (isRailFeature('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), + lit("railway_features_deleted"), (isRailFeature('tags) and !'visible).cast(IntegerType), + lit("raillines_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType), + lit("raillines_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), + lit("raillines_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType), lit("pois_added"), (isPOI('tags) and isNew('version, 'minorVersion)).cast(IntegerType), lit("pois_modified"), (isPOI('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType), lit("pois_deleted"), (isPOI('tags) and !'visible).cast(IntegerType), diff --git a/src/project/Version.scala b/src/project/Version.scala index 3a11f020..64ca5a79 100644 --- a/src/project/Version.scala +++ b/src/project/Version.scala @@ -3,7 +3,7 @@ object Version { val osmesa = "0.1.0" val geotrellis = "2.1.0" val geomesa = "2.3.0" - val vectorpipe = "1.1.0-SNAPSHOT" + val vectorpipe = "1.1.0" val decline = "0.5.0" val cats = "1.0.0" val scalactic = "3.0.3"