diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json index 99b711477..5e64d093f 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "SHREDDED", "types": [ @@ -113,8 +113,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-10-13T20:21:47.595072674Z", "max": "2021-10-15T00:51:57.521746512Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json index 7a79aecc4..5fd20cfcd 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "JSON", @@ -93,8 +93,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-10-13T20:21:47.595072674Z", "max": "2021-10-15T00:51:57.521746512Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad index bb8903ff9..ceb9d8952 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/bad @@ -1 +1 @@ -{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaCriterion":"iglu:org.schema/some_unknown_name/jsonschema/1-*-*","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":"1970-01-01T10:30:00Z"}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}} +{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaKey":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":""}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}} \ No newline at end of file diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json index b9fa0bf91..511c0514b 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json @@ -1,14 +1,14 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "SHREDDED", "types": [] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2014-05-29T18:16:35Z", "max": "2014-05-29T18:16:35Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json index c8d0470e8..a579a8d2e 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -93,8 +93,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-09-17T09:05:28.590000001Z", "max": "2021-10-15T09:06:27.101185600Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json index e5054d6bc..59ac05ada 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -105,8 +105,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2022-02-01T22:14:21.648Z", "max": "2022-02-02T01:01:01.648Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json index 173a79360..83397b2fe 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -101,8 +101,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2022-02-01T22:14:21.648Z", "max": "2022-02-02T01:01:01.648Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json index 671dbfc33..477f0c7d5 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "PARQUET", @@ -21,8 +21,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:30:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2022-02-01T22:32:41.069Z", "max": "2022-02-02T01:01:01.648Z" }, diff --git a/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json b/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json index edef7c0b5..5fd20cfcd 100644 --- a/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json +++ b/modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json @@ -1,7 +1,7 @@ { "schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1", "data": { - "base": "output_path_placeholder/run=1970-01-01-10-30-00/", + "base": "output_path_placeholder", "typesInfo": { "transformation": "WIDEROW", "fileFormat": "JSON", @@ -93,8 +93,8 @@ ] }, "timestamps": { - "jobStarted": "1970-01-01T10:30:00Z", - "jobCompleted": "1970-01-01T10:31:00Z", + "jobStarted": "job_started_placeholder", + "jobCompleted": "job_completed_placeholder", "min": "2021-10-13T20:21:47.595072674Z", "max": "2021-10-15T00:51:57.521746512Z" }, diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala index c4768b665..eb6df0c7d 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala @@ -13,6 +13,9 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce import cats.effect.{IO, Resource} import cats.effect.kernel.Ref +import io.circe.optics.JsonPath._ +import io.circe.parser.{parse => parseCirce} + import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.FileUtils @@ -58,11 +61,15 @@ trait BaseProcessingSpec extends Specification { } .reduce(_ and _) - protected def readMessageFromResource(resource: String, outputRootDirectory: Path) = + protected def readMessageFromResource(resource: String, outputRootDirectory: Path): IO[String] = ??? + + protected def readMessageFromResource(resource: String, completionMessageVars: BaseProcessingSpec.CompletionMessageVars) = readLinesFromResource(resource) .map(_.mkString) .map( - _.replace("output_path_placeholder", outputRootDirectory.toNioPath.toUri.toString.replaceAll("/+$", "")) + _.replace("output_path_placeholder", completionMessageVars.base.toNioPath.toUri.toString) + .replace("job_started_placeholder", completionMessageVars.jobStarted) + .replace("job_completed_placeholder", completionMessageVars.jobCompleted) .replace("version_placeholder", BuildInfo.version) .replace(" ", "") ) @@ -86,6 +93,15 @@ trait BaseProcessingSpec extends Specification { new String(encoder.encode(config.app.replace("file:/", "s3:/").getBytes)) ) } + + def extractCompletionMessageVars(processingOutput: BaseProcessingSpec.ProcessingOutput): BaseProcessingSpec.CompletionMessageVars = { + val message = processingOutput.completionMessages.head + val json = parseCirce(message).toOption.get + val base = root.data.base.string.getOption(json).get.stripPrefix("file://") + val jobStarted = root.data.timestamps.jobStarted.string.getOption(json).get + val jobCompleted = root.data.timestamps.jobCompleted.string.getOption(json).get + BaseProcessingSpec.CompletionMessageVars(Path(base), jobStarted, jobCompleted) + } } object BaseProcessingSpec { @@ -96,5 +112,13 @@ object BaseProcessingSpec { badrowsFromQueue: Vector[String], checkpointed: Int ) + final case class CompletionMessageVars( + base: Path, + jobStarted: String, + jobCompleted: String + ) { + def goodPath: Path = Path(s"$base/output=good") + def badPath: Path = Path(s"$base/output=bad") + } } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala index 3d1ae2eb1..eedfed54a 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala @@ -11,7 +11,6 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing import cats.effect.unsafe.implicits.global -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.QueueBadSinkSpec._ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import fs2.io.file.Path @@ -39,12 +38,12 @@ class QueueBadSinkSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/1/input/events" ) - val config = TransformerConfig(configFromPath(outputDirectory), igluConfig) - val badDirectory = outputDirectory.resolve(s"run=1970-01-01-10-30-00-${AppId.appId}/output=bad") + val config = TransformerConfig(configFromPath(outputDirectory), igluConfig) for { output <- process(inputStream, config) - badDirectoryExists <- pathExists(badDirectory) + compVars = extractCompletionMessageVars(output) + badDirectoryExists <- pathExists(compVars.badPath) expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") } yield { val actualBadRows = output.badrowsFromQueue.toList @@ -98,7 +97,8 @@ object QueueBadSinkSpec { | "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0", | "data": { | "cacheSize": 500, - | "repositories": [] + | "repositories": [ + | ] | } |}""".stripMargin } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala index e7d1efac6..671969620 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala @@ -10,9 +10,8 @@ */ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.ShredTsvProcessingSpec.{appConfig, igluConfig} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.ShredTsvProcessingSpec._ import cats.effect.unsafe.implicits.global import fs2.io.file.Path @@ -31,43 +30,32 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) actualAtomicRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" ) actualOptimizelyRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0" ) actualConsentRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0" ) actualBadRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0" - ) + compVars.badPath / "vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0" ) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", compVars) expectedAtomicRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-atomic") expectedOptimizelyRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.optimizely-state") expectedConsentRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-consent_document") expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(Vector(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(Vector(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertStringRows(removeAppId(actualAtomicRows), expectedAtomicRows) @@ -91,28 +79,23 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) actualAtomicRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" - ) + compVars.goodPath / "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" ) actualBadRows <- readStringRowsFrom( - Path( - outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0" - ) + compVars.badPath / "vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0" ) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/3/output/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/3/output/completion.json", compVars) expectedBadRows <- readLinesFromResource("/processing-spec/3/output/bad") } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) actualAtomicRows.size must beEqualTo(1) - assertStringRows(removeAppId(actualBadRows), expectedBadRows) + assertStringRows(removeLastAttempt(removeAppId(actualBadRows)), expectedBadRows) } } .unsafeRunSync() @@ -156,4 +139,7 @@ object ShredTsvProcessingSpec { | "repositories": [] | } |}""".stripMargin + + def removeLastAttempt(badRows: List[String]): List[String] = + badRows.map(_.replaceAll(""""lastAttempt":".{20}"""", """"lastAttempt":""""")) } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala index c167c0821..ea65d29bc 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShutdownSpec.scala @@ -37,9 +37,10 @@ class ShutdownSpec extends BaseProcessingSpec { for { output <- runWithShutdown(inputStream, config) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/8/output/completion.json", outputDirectory) + compVars = extractCompletionMessageVars(output) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/8/output/completion.json", compVars) } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala index 8c3f47b10..6cc16ec3f 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowJsonProcessingSpec.scala @@ -10,7 +10,6 @@ */ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.WiderowJsonProcessingSpec.{appConfig, igluConfig} @@ -28,20 +27,19 @@ class WiderowJsonProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/1/input/events" ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") - val badPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) - actualGoodRows <- readStringRowsFrom(goodPath) - actualBadRows <- readStringRowsFrom(badPath) + compVars = extractCompletionMessageVars(output) + actualGoodRows <- readStringRowsFrom(compVars.goodPath) + actualBadRows <- readStringRowsFrom(compVars.badPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/widerow/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/widerow/completion.json", compVars) expectedGoodRows <- readLinesFromResource("/processing-spec/1/output/good/widerow/events") expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad") } yield { - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertStringRows(actualGoodRows, expectedGoodRows) assertStringRows(actualBadRows, expectedBadRows) diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala index 6e5acc28b..5a6a3f95d 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala @@ -14,7 +14,7 @@ import cats.effect.IO import cats.effect.unsafe.implicits.global import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{AppId, ParquetUtils} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils.{readParquetColumns, readParquetRowsAsJsonFrom} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.WiderowParquetProcessingSpec.{appConfig, igluConfig} @@ -42,28 +42,26 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { inputEventsPath = "/processing-spec/4/input/events" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") - val badPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource( "/processing-spec/4/output/good/parquet/schema" ) // the same schema as in resource file used in WideRowParquetSpec for batch transformer - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - actualBadRows <- readStringRowsFrom(badPath) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + actualBadRows <- readStringRowsFrom(compVars.badPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/4/output/good/parquet/completion.json", outputDirectory) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/4/output/good/parquet/completion.json", compVars) expectedBadRows <- readLinesFromResource("/processing-spec/4/output/badrows") expectedParquetRows <- readGoodParquetEventsFromResource("/processing-spec/4/input/events", columnToAdjust = None) } yield { actualParquetRows.size must beEqualTo(46) actualBadRows.size must beEqualTo(4) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertParquetRows(actualParquetRows, expectedParquetRows) @@ -82,15 +80,15 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { "/processing-spec/5/input/input-events-custom-contexts" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/5/output/good/parquet/schema") - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/5/output/good/parquet/completion.json", outputDirectory) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/5/output/good/parquet/completion.json", compVars) expectedParquetRows <- readGoodParquetEventsFromResource( "/processing-spec/5/input/input-events-custom-contexts", columnToAdjust = Some("contexts_com_snowplowanalytics_snowplow_parquet_test_a_1") @@ -98,7 +96,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { } yield { actualParquetRows.size must beEqualTo(100) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertParquetRows(actualParquetRows, expectedParquetRows) @@ -115,15 +113,15 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { "/processing-spec/6/input/input-events-custom-unstruct" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/6/output/good/parquet/schema") - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/6/output/good/parquet/completion.json", outputDirectory) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/6/output/good/parquet/completion.json", compVars) expectedParquetRows <- readGoodParquetEventsFromResource( "/processing-spec/6/input/input-events-custom-unstruct", @@ -132,7 +130,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { } yield { actualParquetRows.size must beEqualTo(100) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) assertParquetRows(actualParquetRows, expectedParquetRows) @@ -149,21 +147,21 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { "/processing-spec/7/input/events" // the same events as in resource file used in WideRowParquetSpec for batch transformer ) - val config = TransformerConfig(appConfig(outputDirectory), igluConfig) - val goodPath = Path(outputDirectory.toString + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good") + val config = TransformerConfig(appConfig(outputDirectory), igluConfig) for { output <- process(inputStream, config) + compVars = extractCompletionMessageVars(output) expectedParquetColumns <- readParquetColumnsFromResource( "/processing-spec/7/output/good/parquet/schema" ) // the same schema as in resource file used in WideRowParquetSpec for batch transformer - actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) - actualParquetColumns = readParquetColumns(goodPath) - expectedCompletionMessage <- readMessageFromResource("/processing-spec/7/output/good/parquet/completion.json", outputDirectory) + actualParquetRows <- readParquetRowsAsJsonFrom(compVars.goodPath, expectedParquetColumns) + actualParquetColumns = readParquetColumns(compVars.goodPath) + expectedCompletionMessage <- readMessageFromResource("/processing-spec/7/output/good/parquet/completion.json", compVars) } yield { actualParquetRows.size must beEqualTo(3) - removeAppId(output.completionMessages.toList) must beEqualTo(List(expectedCompletionMessage)) + output.completionMessages.toList must beEqualTo(List(expectedCompletionMessage)) output.checkpointed must beEqualTo(1) forall( diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala index 82d16152a..f417b8614 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala @@ -335,16 +335,6 @@ object ShredJobSpec { |"cacheSize": 500, |"repositories": [ |{ - |"name": "Local Iglu Server", - |"priority": 0, - |"vendorPrefixes": [ "com.snowplowanalytics" ], - |"connection": { - |"http": { - |"uri": "http://localhost:8080/api" - |} - |} - |}, - |{ |"name": "Iglu Central", |"priority": 0, |"vendorPrefixes": [ "com.snowplowanalytics" ], diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index a3125cf4c..fca329f8b 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -178,6 +178,7 @@ object BuildSettings { "iglu:com.segment/screen/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/change_form/jsonschema/1-0-0", + "iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1", "iglu:com.snowplowanalytics.snowplow/consent_document/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow/consent_withdrawn/jsonschema/1-0-0",