From c68ab59fd985a018daf44e21c102fc6af0d08a4d Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Mon, 22 Jun 2020 09:13:16 -0600 Subject: [PATCH 1/9] Port to Spark 3 --- pom.xml | 68 ++++---- .../java/com/lucidworks/spark/SparkApp.java | 10 +- .../spark/example/ml/MLPipeline.java | 5 +- .../DocumentFilteringStreamProcessor.java | 147 ------------------ .../TwitterToSolrStreamProcessor.java | 98 ------------ .../example/events/EventsimIndexer.scala | 6 +- .../spark/example/ml/MLPipelineScala.scala | 5 +- .../lucidworks/spark/rdd/SelectSolrRDD.scala | 14 +- .../spark/rdd/StreamingSolrRDD.scala | 7 +- .../fusion/FusionPipelineClientTest.java | 6 +- src/test/resources/conf/solrconfig.xml | 2 +- .../TestPartitionByTimeQuerySupport.scala | 9 +- .../spark/examples/TwitterTestSuite.scala | 26 ---- 13 files changed, 63 insertions(+), 340 deletions(-) delete mode 100644 src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java delete mode 100644 src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java delete mode 100644 src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala diff --git a/pom.xml b/pom.xml index 213d55b7..a78aaca8 100644 --- a/pom.xml +++ b/pom.xml @@ -36,14 +36,14 @@ UTF-8 - 1.8 - 2.4.5 + 11 + 3.0.0 8.4.1 2.7.5 - 2.10.1 - 2.11.12 - 2.11 - 1.1.1 + 2.11.0 + 2.12.10 + 2.12 + 1.4.1 128m @@ -78,7 +78,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.10.3 + 3.2.0 attach-javadocs @@ -98,7 +98,7 @@ verify sign - + --pinentry-mode @@ -121,7 +121,7 @@ org.jacoco jacoco-maven-plugin - 0.7.5.201505241946 + 0.8.5 com/lucidworks/spark/example/** @@ -156,9 +156,6 @@ org.apache.maven.plugins maven-surefire-plugin 2.18.1 - - ${argLine} -XX:MaxPermSize=${MaxPermSize} - @@ -169,7 +166,7 @@ net.alchim31.maven scala-maven-plugin - 3.2.2 + 4.4.0 eclipse-add-source @@ -213,7 +210,7 @@ maven-compiler-plugin - 3.0 + 3.8.1 ${java.version} ${java.version} @@ -222,16 +219,13 @@ org.codehaus.mojo versions-maven-plugin - 2.3 + 2.7 org.apache.maven.plugins maven-surefire-plugin - 2.18.1 - - -XX:MaxPermSize=${MaxPermSize} - + 2.22.2 @@ -372,7 +366,7 @@ org.apache.http shaded.apache.http - + @@ -446,7 +440,7 @@ com.esotericsoftware kryo-shaded - 3.0.3 + 4.0.2 compile @@ -460,20 +454,15 @@ spark-mllib_${scala.binary.version} ${spark.version} - - org.apache.bahir - spark-streaming-twitter_${scala.binary.version} - 2.0.1 - commons-cli commons-cli - 1.3.1 + 1.4 commons-io commons-io - 2.5 + 2.7 org.apache.solr @@ -594,12 +583,12 @@ org.apache.commons commons-lang3 - 3.5 + 3.9 junit junit - 4.12 + 4.13 test @@ -639,7 +628,7 @@ org.scalatest scalatest_${scala.binary.version} - 3.0.3 + 3.0.8 test @@ -650,19 +639,20 @@ org.apache.commons commons-compress - 1.19 + 1.20 - + + - com.github.tomakehurst - wiremock - 1.56 - - standalone + com.github.tomakehurst + wiremock-standalone + 2.26.3 org.mortbay.jetty diff --git a/src/main/java/com/lucidworks/spark/SparkApp.java b/src/main/java/com/lucidworks/spark/SparkApp.java index 1d76c7de..5a595118 100644 --- a/src/main/java/com/lucidworks/spark/SparkApp.java +++ b/src/main/java/com/lucidworks/spark/SparkApp.java @@ -23,8 +23,6 @@ import com.lucidworks.spark.example.hadoop.Logs2SolrRDDProcessor; import com.lucidworks.spark.example.query.KMeansAnomaly; import com.lucidworks.spark.example.query.*; -import com.lucidworks.spark.example.streaming.DocumentFilteringStreamProcessor; -import com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -241,14 +239,10 @@ private static RDDProcessor newProcessor(String streamProcType) throws Exception streamProcType = streamProcType.trim(); - if ("twitter-to-solr".equals(streamProcType)) - return new TwitterToSolrStreamProcessor(); - else if ("word-count".equals(streamProcType)) + if ("word-count".equals(streamProcType)) return new WordCount(); else if ("term-vectors".equals(streamProcType)) return new ReadTermVectors(); - else if ("docfilter".equals(streamProcType)) - return new DocumentFilteringStreamProcessor(); else if ("hdfs-to-solr".equals(streamProcType)) return new HdfsToSolrRDDProcessor(); else if ("logs2solr".equals(streamProcType)) @@ -278,10 +272,8 @@ else if ("eventsim".equals(streamProcType)) private static void displayProcessorOptions(PrintStream out) throws Exception { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("twitter-to-solr", getProcessorOptions(new TwitterToSolrStreamProcessor())); formatter.printHelp("word-count", getProcessorOptions(new WordCount())); formatter.printHelp("term-vectors", getProcessorOptions(new ReadTermVectors())); - formatter.printHelp("docfilter", getProcessorOptions(new DocumentFilteringStreamProcessor())); formatter.printHelp("hdfs-to-solr", getProcessorOptions(new HdfsToSolrRDDProcessor())); formatter.printHelp("logs2solr", getProcessorOptions(new Logs2SolrRDDProcessor())); formatter.printHelp("query-solr-benchmark", getProcessorOptions(new QueryBenchmark())); diff --git a/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java b/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java index 36b084be..904d1c04 100644 --- a/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java +++ b/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java @@ -218,8 +218,9 @@ public int run(SparkConf conf, CommandLine cli) throws Exception { System.out.println(confusionMatrix); // compute the false positive rate per label - System.out.println(); - System.out.println("F-Measure: "+metrics.fMeasure()); + //System.out.println(); + // TODO: Spark3 + //System.out.println("F-Measure: "+metrics.fMeasure()); System.out.println("label\tfpr\n"); String[] labels = labelConverter.getLabels(); diff --git a/src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java b/src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java deleted file mode 100644 index b3ea50e7..00000000 --- a/src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.lucidworks.spark.example.streaming; - -import com.lucidworks.spark.util.SolrSupport; -import com.lucidworks.spark.SparkApp; -import com.lucidworks.spark.filter.DocFilterContext; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.log4j.Logger; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.common.SolrInputDocument; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.DStream; -import org.apache.spark.streaming.twitter.TwitterUtils; -import twitter4j.Status; - -import java.util.ArrayList; -import java.util.List; - -/** - * Example showing how to match documents against a set of known queries; useful - * for doing things like alerts, etc. - */ -public class DocumentFilteringStreamProcessor extends SparkApp.StreamProcessor { - - public static Logger log = Logger.getLogger(DocumentFilteringStreamProcessor.class); - - /** - * A DocFilterContext is responsible for loading queries from some external system and - * then doing something with each doc that is matched to a query. - */ - class ExampleDocFilterContextImpl implements DocFilterContext { - - public void init(JavaStreamingContext jssc, CommandLine cli) { - // nothing to init for this basic impl - } - - public String getDocIdFieldName() { return "id"; } - - public List getQueries() { - List queryList = new ArrayList(); - - // a real impl would pull queries from an external system, such as Solr or a DB or a file - SolrQuery q1 = new SolrQuery("type_s:post"); - q1.setParam("_qid_", "POSTS"); // identify the query when tagging matching docs - queryList.add(q1); - - SolrQuery q2 = new SolrQuery("type_s:echo"); - q2.setParam("_qid_", "ECHOS"); - queryList.add(q2); - - return queryList; - } - - public void onMatch(SolrQuery query, SolrInputDocument inputDoc) { - String[] qids = query.getParams("_qid_"); - if (qids == null || qids.length < 1) return; // not one of ours - - if (log.isDebugEnabled()) - log.debug("document [" + inputDoc.getFieldValue("id") + "] matches query: " + qids[0]); - - // just index the matching query for later analysis - inputDoc.addField("_qid_ss", qids[0]); - } - } - - public String getName() { return "docfilter"; } - - @Override - public void setup(JavaStreamingContext jssc, CommandLine cli) throws Exception { - - // load the DocFilterContext implementation, which knows how to load queries - DocFilterContext docFilterContext = loadDocFilterContext(jssc, cli); - final String idFieldName = docFilterContext.getDocIdFieldName(); - - // start receiving a stream of tweets ... - String filtersArg = cli.getOptionValue("tweetFilters"); - String[] filters = (filtersArg != null) ? filtersArg.split(",") : new String[0]; - JavaReceiverInputDStream tweets = TwitterUtils.createStream(jssc, null, filters); - - // map incoming tweets into SolrInputDocument objects for indexing in Solr - JavaDStream docs = tweets.map( - new Function() { - public SolrInputDocument call(Status status) { - SolrInputDocument doc = - SolrSupport.autoMapToSolrInputDoc(idFieldName, "tweet-"+status.getId(), status, null); - doc.setField("provider_s", "twitter"); - doc.setField("author_s", status.getUser().getScreenName()); - doc.setField("type_s", status.isRetweet() ? "echo" : "post"); - return doc; - } - } - ); - - // run each doc through a list of filters pulled from our DocFilterContext - String filterCollection = cli.getOptionValue("filterCollection", collection); - DStream enriched = - SolrSupport.filterDocuments(docFilterContext, zkHost, filterCollection, docs.dstream()); - - // now index the enriched docs into Solr (or do whatever after the matching process runs) - SolrSupport.indexDStreamOfDocs(zkHost, collection, batchSize, enriched); - } - - protected DocFilterContext loadDocFilterContext(JavaStreamingContext jssc, CommandLine cli) - throws Exception - { - DocFilterContext ctxt = null; - String docFilterContextImplClass = cli.getOptionValue("docFilterContextImplClass"); - if (docFilterContextImplClass != null) { - Class implClass = - (Class)getClass().getClassLoader().loadClass(docFilterContextImplClass); - ctxt = implClass.newInstance(); - } else { - ctxt = new ExampleDocFilterContextImpl(); - } - ctxt.init(jssc, cli); - return ctxt; - } - - public Option[] getOptions() { - return new Option[]{ - OptionBuilder - .withArgName("LIST") - .hasArg() - .isRequired(false) - .withDescription("List of Twitter keywords to filter on, separated by commas") - .create("tweetFilters"), - OptionBuilder - .withArgName("NAME") - .hasArg() - .isRequired(false) - .withDescription("Collection to pull configuration files to create an " + - "EmbeddedSolrServer for document matching; defaults to the value of the collection option.") - .create("filterCollection"), - OptionBuilder - .withArgName("CLASS") - .hasArg() - .isRequired(false) - .withDescription("Name of the DocFilterContext implementation class; defaults to an internal example impl: "+ - ExampleDocFilterContextImpl.class.getName()) - .create("docFilterContextImplClass") - }; - } -} diff --git a/src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java b/src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java deleted file mode 100644 index b2005f3a..00000000 --- a/src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.lucidworks.spark.example.streaming; - -import com.lucidworks.spark.SparkApp; -import com.lucidworks.spark.util.SolrSupport; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.log4j.Logger; -import org.apache.solr.common.SolrInputDocument; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.twitter.TwitterUtils; -import twitter4j.Status; - -/** - * Simple example of indexing tweets into Solr using Spark streaming; be sure to update the - * twitter4j.properties file on the classpath with your Twitter API credentials. - */ -public class TwitterToSolrStreamProcessor extends SparkApp.StreamProcessor { - - public static Logger log = Logger.getLogger(TwitterToSolrStreamProcessor.class); - - public String getName() { return "twitter-to-solr"; } - - /** - * Sends a stream of tweets to Solr. - */ - @Override - public void setup(JavaStreamingContext jssc, CommandLine cli) throws Exception { - String filtersArg = cli.getOptionValue("tweetFilters"); - String[] filters = (filtersArg != null) ? filtersArg.split(",") : new String[0]; - - // start receiving a stream of tweets ... - JavaReceiverInputDStream tweets = - TwitterUtils.createStream(jssc, null, filters); - - String fusionUrl = cli.getOptionValue("fusion"); - if (fusionUrl != null) { - // just send JSON directly to Fusion - SolrSupport.sendDStreamOfDocsToFusion(fusionUrl, cli.getOptionValue("fusionCredentials"), tweets.dstream(), batchSize); - } else { - // map incoming tweets into PipelineDocument objects for indexing in Solr - JavaDStream docs = tweets.map( - new Function() { - - /** - * Convert a twitter4j Status object into a SolrJ SolrInputDocument - */ - public SolrInputDocument call(Status status) { - - if (log.isDebugEnabled()) { - log.debug("Received tweet: " + status.getId() + ": " + status.getText().replaceAll("\\s+", " ")); - } - - // simple mapping from primitives to dynamic Solr fields using reflection - SolrInputDocument doc = - SolrSupport.autoMapToSolrInputDoc("tweet-"+status.getId(), status, null); - doc.setField("provider_s", "twitter"); - doc.setField("author_s", status.getUser().getScreenName()); - doc.setField("type_s", status.isRetweet() ? "echo" : "post"); - - if (log.isDebugEnabled()) - log.debug("Transformed document: " + doc.toString()); - return doc; - } - } - ); - - // when ready, send the docs into a SolrCloud cluster - SolrSupport.indexDStreamOfDocs(zkHost, collection, batchSize, docs.dstream()); - } - } - - public Option[] getOptions() { - return new Option[]{ - OptionBuilder - .withArgName("LIST") - .hasArg() - .isRequired(false) - .withDescription("List of Twitter keywords to filter on, separated by commas") - .create("tweetFilters"), - OptionBuilder - .withArgName("URL(s)") - .hasArg() - .isRequired(false) - .withDescription("Fusion endpoint") - .create("fusion"), - OptionBuilder - .withArgName("user:password:realm") - .hasArg() - .isRequired(false) - .withDescription("Fusion credentials user:password:realm") - .create("fusionCredentials") - }; - } -} diff --git a/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala b/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala index 202dee8b..6e190ffa 100644 --- a/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala +++ b/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala @@ -7,7 +7,7 @@ import com.lucidworks.spark.SparkApp.RDDProcessor import com.lucidworks.spark.fusion.FusionPipelineClient import org.apache.commons.cli.{CommandLine, Option} import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.JavaConversions.bufferAsJavaList import scala.collection.mutable.ListBuffer @@ -70,7 +70,9 @@ class EventsimIndexer extends RDDProcessor { val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() - sparkSession.read.json(cli.getOptionValue("eventsimJson")).foreachPartition(rows => { + val df : DataFrame = sparkSession.read.json(cli.getOptionValue("eventsimJson")) + + df.foreachPartition((rows : Iterator[Row]) => { val fusion: FusionPipelineClient = if (fusionAuthEnabled) new FusionPipelineClient(fusionEndpoints, fusionUser, fusionPass, fusionRealm) else new FusionPipelineClient(fusionEndpoints) diff --git a/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala b/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala index 453f9744..51e74ae1 100644 --- a/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala +++ b/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala @@ -156,8 +156,9 @@ class MLPipelineScala extends SparkApp.RDDProcessor { |${metrics.confusionMatrix}\n""".stripMargin) // compute the false positive rate per label - println(s"""\nF-Measure: ${metrics.fMeasure} - |label\tfpr\n""".stripMargin) + // TODO: Spark3 + //println(s"""\nF-Measure: ${metrics.fMeasure} + // |label\tfpr\n""".stripMargin) val labels = labelConverter.getLabels for (i <- labels.indices) println(s"${labels(i)}\t${metrics.falsePositiveRate(i.toDouble)}") diff --git a/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala b/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala index 326134b3..7e1483c4 100644 --- a/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala +++ b/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala @@ -54,9 +54,10 @@ class SelectSolrRDD( query.setRequestHandler(solrRequestHandler) logger.debug(s"Using cursorMarks to fetch documents from ${partition.preferredReplica} for query: ${partition.query}") val resultsIterator = new StreamingResultsIterator(SolrSupport.getCachedHttpSolrClient(url, zkHost), partition.query, partition.cursorMark) - context.addTaskCompletionListener { (context) => - logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}") - } + // TODO: Spark3 + //context.addTaskCompletionListener { (context: TaskContext) => + // logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}") + //} resultsIterator } case p: SolrLimitPartition => { @@ -69,9 +70,10 @@ class SelectSolrRDD( query.setStart(null) // important! must start as null else the Iterator will advance the start position by the row size val resultsIterator = new StreamingResultsIterator(SolrSupport.getCachedCloudClient(p.zkhost), query) resultsIterator.setMaxSampleDocs(p.maxRows) - context.addTaskCompletionListener { (context) => - logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from the limit (${p.maxRows}) partition of ${p.collection}") - } + // TODO: Spark3 + //context.addTaskCompletionListener { (context: TaskContext) => + // logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from the limit (${p.maxRows}) partition of ${p.collection}") + //} resultsIterator } case partition: AnyRef => throw new Exception("Unknown partition type '" + partition.getClass) diff --git a/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala b/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala index 7dcd78f3..baa839e0 100644 --- a/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala +++ b/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala @@ -90,9 +90,10 @@ class StreamingSolrRDD( query.setRequestHandler(solrRequestHandler) logger.debug(s"Using export handler to fetch documents from ${partition.preferredReplica} for query: ${partition.query}") val resultsIterator = getExportHandlerBasedIterator(url, query, partition.numWorkers, partition.workerId) - context.addTaskCompletionListener { (context) => - logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}") - } + // TODO: Spark3 + //context.addTaskCompletionListener { (context) => + // logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}") + //} resultsIterator case partition: AnyRef => throw new Exception("Unknown partition type '" + partition.getClass) } diff --git a/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java b/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java index e76f5528..e9238047 100644 --- a/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java +++ b/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java @@ -135,15 +135,15 @@ public void testHappyPath() throws Exception { if (useWireMockRule) { // mock out the Pipeline API // stubFor(post(urlEqualTo("/api/apollo/index-pipelines")).willReturn(aResponse().withStatus(200))); - stubFor(post(urlEqualTo(fusionPipelineUrlWithoutHostAndPort)).willReturn(aResponse().withStatus(200))); + stubFor(post(urlEqualTo(fusionPipelineUrlWithoutHostAndPort+"?echo=false")).willReturn(aResponse().withStatus(200))); stubFor(get(urlEqualTo(fusionProxyBaseUrl + fusionIndexingPipelineUrlExtension)).willReturn(aResponse().withStatus(200).withBody("hello"))); // a bad node in the mix ... to test FusionPipelineClient error handling - stubFor(post(urlEqualTo(badPath)).willReturn(aResponse().withStatus(500))); + stubFor(post(urlEqualTo(badPath+"?echo=false")).willReturn(aResponse().withStatus(500))); // another bad node in the mix which produces un-authorized errors... to test FusionPipelineClient error handling - stubFor(post(urlEqualTo(unauthPath)).willReturn(aResponse().withStatus(401))); + stubFor(post(urlEqualTo(unauthPath+"?echo=false")).willReturn(aResponse().withStatus(401))); // mock out the Session API stubFor(post(urlEqualTo("/api/session?realmName=" + fusionRealm)).willReturn(aResponse().withStatus(200))); diff --git a/src/test/resources/conf/solrconfig.xml b/src/test/resources/conf/solrconfig.xml index f6c8525d..b90d425b 100755 --- a/src/test/resources/conf/solrconfig.xml +++ b/src/test/resources/conf/solrconfig.xml @@ -1,6 +1,6 @@ - 6.4.2 + LATEST ${solr.data.dir:} diff --git a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala index 3080873d..4f626950 100644 --- a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala +++ b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala @@ -5,7 +5,8 @@ import org.apache.solr.client.solrj.SolrQuery import org.apache.spark.sql.SaveMode._ import com.lucidworks.spark.util.ConfigurationConstants._ import org.apache.solr.client.solrj.request.CollectionAdminRequest -import org.apache.solr.client.solrj.request.CollectionAdminRequest.{CreateAlias, DeleteAlias} +import org.apache.spark.sql.functions._ + /** This class is used to test the PartitionByTimeQuerySupport class */ @@ -24,7 +25,11 @@ class TestPartitionByTimeQuerySupport extends TestSuiteBuilder { val jsonDF = sparkSession.read.json(jsonFileLocation) assert(jsonDF.count == 100) - var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:30" && jsonDF("timestamp_tdt") < "2014-11-24T17:31") + jsonDF.printSchema() + jsonDF.withColumn("timestamp_tdt", to_utc_timestamp(col("timestamp_tdt"),"UTC")).withColumn("ts", to_timestamp(lit("2014-11-24T17:30"),"yyyy-MM-dd'T'HH:mm")) + .select("timestamp_tdt", "ts").collectAsList().forEach(println(_)) + + var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:30"),"yyyy-MM-dd'T'HH:mm") && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:31"), "yyyy-MM-dd'T'HH:mm")) assert(col1DF.count == 32) col1DF=col1DF.drop(col1DF("_version_")) var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:31" && jsonDF("timestamp_tdt") < "2014-11-24T17:32") diff --git a/src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala b/src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala deleted file mode 100644 index d1f5c0a0..00000000 --- a/src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.lucidworks.spark.examples - -import com.lucidworks.spark.SparkSolrFunSuite -import com.lucidworks.spark.util.SolrSupport -import org.apache.solr.common.SolrInputDocument -import twitter4j.TwitterObjectFactory - -class TwitterTestSuite extends SparkSolrFunSuite { - - test("Test twitter field object mapping") { - val tweetJSON = "{ \"scopes\":{ \"place_ids\":[ \"place one\",\"place two\"]}, \"created_at\":\"Tue Mar 05 23:57:32 +0000 2013\", \"id\":309090333021581313, \"id_str\":\"309090333021581313\", \"text\":\"As announced, @anywhere has been retired per https:\\/\\/t.co\\/bWXjhurvwp The js file now logs a message to the console and exits quietly. ^ARK\", \"source\":\"web\", \"truncated\":false, \"in_reply_to_status_id\":null, \"in_reply_to_status_id_str\":null, \"in_reply_to_user_id\":null, \"in_reply_to_user_id_str\":null, \"in_reply_to_screen_name\":null, \"user\":{ \"id\":6253282, \"id_str\":\"6253282\", \"name\":\"Twitter API\", \"screen_name\":\"twitterapi\", \"location\":\"San Francisco, CA\", \"description\":\"The Real Twitter API. I tweet about API changes, service issues and happily answer questions about Twitter and our API. Don't get an answer? It's on my website.\", \"url\":\"http:\\/\\/dev.twitter.com\", \"entities\":{ \"url\":{ \"urls\":[ { \"url\":\"http:\\/\\/dev.twitter.com\", \"expanded_url\":null, \"indices\":[ 0, 22 ] } ] }, \"description\":{ \"urls\":[ ] } }, \"protected\":false, \"followers_count\":1533137, \"friends_count\":33, \"listed_count\":11369, \"created_at\":\"Wed May 23 06:01:13 +0000 2007\", \"favourites_count\":25, \"utc_offset\":-28800, \"time_zone\":\"Pacific Time (US & Canada)\", \"geo_enabled\":true, \"verified\":true, \"statuses_count\":3392, \"lang\":\"en\", \"contributors_enabled\":true, \"is_translator\":false, \"profile_background_color\":\"C0DEED\", \"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/656927849\\/miyt9dpjz77sc0w3d4vj.png\", \"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/656927849\\/miyt9dpjz77sc0w3d4vj.png\", \"profile_background_tile\":true, \"profile_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_images\\/2284174872\\/7df3h38zabcvjylnyfe3_normal.png\", \"profile_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_images\\/2284174872\\/7df3h38zabcvjylnyfe3_normal.png\", \"profile_banner_url\":\"https:\\/\\/si0.twimg.com\\/profile_banners\\/6253282\\/1347394302\", \"profile_link_color\":\"0084B4\", \"profile_sidebar_border_color\":\"C0DEED\", \"profile_sidebar_fill_color\":\"DDEEF6\", \"profile_text_color\":\"333333\", \"profile_use_background_image\":true, \"default_profile\":false, \"default_profile_image\":false, \"following\":null, \"follow_request_sent\":false, \"notifications\":null }, \"geo\":null, \"coordinates\":null, \"place\":null, \"contributors\":[ 7588892 ], \"retweet_count\":74, \"entities\":{ \"hashtags\":[ ], \"urls\":[ { \"url\":\"https:\\/\\/t.co\\/bWXjhurvwp\", \"expanded_url\":\"https:\\/\\/dev.twitter.com\\/blog\\/sunsetting-anywhere\", \"display_url\":\"dev.twitter.com\\/blog\\/sunsettin…\", \"indices\":[ 45, 68 ] } ], \"user_mentions\":[ { \"screen_name\":\"anywhere\", \"name\":\"Anywhere\", \"id\":9576402, \"id_str\":\"9576402\", \"indices\":[ 14, 23 ] } ] }, \"favorited\":false, \"retweeted\":false, \"possibly_sensitive\":false, \"lang\":\"en\" }" - val tweetStatusObj = TwitterObjectFactory.createStatus(tweetJSON) - // simple mapping from primitives to dynamic Solr fields using reflection - val doc: SolrInputDocument = SolrSupport.autoMapToSolrInputDoc("tweet-" + tweetStatusObj.getId, tweetStatusObj, null) - logger.info("Mapped to Document: " + doc.toString) - - assert(doc.containsKey("createdAt_tdt")) - assert(doc.containsKey("lang_s")) - assert(doc.containsKey("favoriteCount_i")) - assert(doc.containsKey("source_s")) - assert(doc.containsKey("retweeted_b")) - assert(doc.containsKey("retweetCount_i")) - assert(doc.containsKey("inReplyToStatusId_l")) - } - -} From ee8a71eb1f3f764141c56e2b7320dc751620fdd3 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Mon, 22 Jun 2020 09:52:57 -0600 Subject: [PATCH 2/9] us to_timestamp --- .../spark/TestPartitionByTimeQuerySupport.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala index 4f626950..5e69369d 100644 --- a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala +++ b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala @@ -1,5 +1,7 @@ package com.lucidworks.spark +import java.util.TimeZone + import com.lucidworks.spark.util.{SolrCloudUtil, SolrSupport} import org.apache.solr.client.solrj.SolrQuery import org.apache.spark.sql.SaveMode._ @@ -25,17 +27,13 @@ class TestPartitionByTimeQuerySupport extends TestSuiteBuilder { val jsonDF = sparkSession.read.json(jsonFileLocation) assert(jsonDF.count == 100) - jsonDF.printSchema() - jsonDF.withColumn("timestamp_tdt", to_utc_timestamp(col("timestamp_tdt"),"UTC")).withColumn("ts", to_timestamp(lit("2014-11-24T17:30"),"yyyy-MM-dd'T'HH:mm")) - .select("timestamp_tdt", "ts").collectAsList().forEach(println(_)) - - var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:30"),"yyyy-MM-dd'T'HH:mm") && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:31"), "yyyy-MM-dd'T'HH:mm")) + var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:30:00Z")) && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:31:00Z"))) assert(col1DF.count == 32) col1DF=col1DF.drop(col1DF("_version_")) - var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:31" && jsonDF("timestamp_tdt") < "2014-11-24T17:32") + var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:31:00Z")) && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:32:00Z"))) assert(col2DF.count == 31) col2DF=col2DF.drop(col2DF("_version_")) - var col3DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:33" && jsonDF("timestamp_tdt") < "2014-11-24T17:34") + var col3DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:33:00Z")) && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:34:00Z"))) assert(col3DF.count == 37) col3DF=col3DF.drop(col3DF("_version_")) From a5a346b107bd0fc44086b24d85486112bcc296bd Mon Sep 17 00:00:00 2001 From: kiranchitturi Date: Mon, 22 Jun 2020 10:34:10 -0700 Subject: [PATCH 3/9] Update project version and shade plugin version --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index a78aaca8..d88d09d1 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.lucidworks.spark spark-solr - 3.9.0-SNAPSHOT + 4.0.0-SNAPSHOT jar spark-solr Tools for reading data from Spark into Solr @@ -256,7 +256,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.1.0 + 3.2.4 package From ee7d3021d9d64fcb703e6f7670b0ee62c24ef030 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Mon, 22 Jun 2020 11:40:12 -0600 Subject: [PATCH 4/9] Use inferTimestamp to get test passing --- .../spark/TestPartitionByTimeQuerySupport.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala index 5e69369d..78af629a 100644 --- a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala +++ b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala @@ -1,7 +1,5 @@ package com.lucidworks.spark -import java.util.TimeZone - import com.lucidworks.spark.util.{SolrCloudUtil, SolrSupport} import org.apache.solr.client.solrj.SolrQuery import org.apache.spark.sql.SaveMode._ @@ -24,16 +22,16 @@ class TestPartitionByTimeQuerySupport extends TestSuiteBuilder { SolrCloudUtil.buildCollection(zkHost, collection3Name, null, 1, cloudClient, sc) try { val jsonFileLocation = "src/test/resources/test-data/events.json" - val jsonDF = sparkSession.read.json(jsonFileLocation) + val jsonDF = sparkSession.read.option("inferTimestamp", false).json(jsonFileLocation) assert(jsonDF.count == 100) - var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:30:00Z")) && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:31:00Z"))) + var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >="2014-11-24T17:30" && jsonDF("timestamp_tdt") < "2014-11-24T17:31") assert(col1DF.count == 32) col1DF=col1DF.drop(col1DF("_version_")) - var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:31:00Z")) && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:32:00Z"))) + var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >="2014-11-24T17:31" && jsonDF("timestamp_tdt") < "2014-11-24T17:32") assert(col2DF.count == 31) col2DF=col2DF.drop(col2DF("_version_")) - var col3DF=jsonDF.filter(jsonDF("timestamp_tdt") >= to_timestamp(lit("2014-11-24T17:33:00Z")) && jsonDF("timestamp_tdt") < to_timestamp(lit("2014-11-24T17:34:00Z"))) + var col3DF=jsonDF.filter(jsonDF("timestamp_tdt") >="2014-11-24T17:33" && jsonDF("timestamp_tdt") < "2014-11-24T17:34") assert(col3DF.count == 37) col3DF=col3DF.drop(col3DF("_version_")) From f564ee5ec1edd9490a060578a3888d43126fb7e7 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Mon, 22 Jun 2020 11:41:48 -0600 Subject: [PATCH 5/9] remove import --- .../com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala index 78af629a..b015796c 100644 --- a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala +++ b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala @@ -5,7 +5,6 @@ import org.apache.solr.client.solrj.SolrQuery import org.apache.spark.sql.SaveMode._ import com.lucidworks.spark.util.ConfigurationConstants._ import org.apache.solr.client.solrj.request.CollectionAdminRequest -import org.apache.spark.sql.functions._ /** This class is used to test the PartitionByTimeQuerySupport class From ab83577215e33931737a87a004cc405964914405 Mon Sep 17 00:00:00 2001 From: Ian Pointer Date: Mon, 22 Jun 2020 14:05:29 -0400 Subject: [PATCH 6/9] Change `fMeasure()` to `accuracy()` --- .../java/com/lucidworks/spark/example/ml/MLPipeline.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java b/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java index 904d1c04..e87623be 100644 --- a/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java +++ b/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java @@ -218,9 +218,8 @@ public int run(SparkConf conf, CommandLine cli) throws Exception { System.out.println(confusionMatrix); // compute the false positive rate per label - //System.out.println(); - // TODO: Spark3 - //System.out.println("F-Measure: "+metrics.fMeasure()); + + System.out.println("Accuracy: "+metrics.accuracy()); System.out.println("label\tfpr\n"); String[] labels = labelConverter.getLabels(); From efd017987c68fd0f3ed7ad781507688b32f42464 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Mon, 22 Jun 2020 12:13:46 -0600 Subject: [PATCH 7/9] Remove DocFilter support --- .../spark/filter/DocFilterContext.java | 21 -- .../spark/util/EmbeddedSolrServerFactory.java | 152 -------------- .../lucidworks/spark/util/SolrSupport.scala | 72 +------ .../spark/solr/TestEmbeddedSolrServer.java | 47 ----- src/test/resources/custom-solrconfig.xml | 185 ------------------ 5 files changed, 2 insertions(+), 475 deletions(-) delete mode 100644 src/main/java/com/lucidworks/spark/filter/DocFilterContext.java delete mode 100644 src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java delete mode 100644 src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java delete mode 100644 src/test/resources/custom-solrconfig.xml diff --git a/src/main/java/com/lucidworks/spark/filter/DocFilterContext.java b/src/main/java/com/lucidworks/spark/filter/DocFilterContext.java deleted file mode 100644 index 02275ccd..00000000 --- a/src/main/java/com/lucidworks/spark/filter/DocFilterContext.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.lucidworks.spark.filter; - -import org.apache.commons.cli.CommandLine; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.common.SolrInputDocument; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -import java.io.Serializable; -import java.util.List; - -/** - * Used by the document filtering framework to delegate the loading and - * of queries used to filter documents and what to do when a document - * matches a query. - */ -public interface DocFilterContext extends Serializable { - void init(JavaStreamingContext jssc, CommandLine cli); - String getDocIdFieldName(); - List getQueries(); - void onMatch(SolrQuery query, SolrInputDocument inputDoc); -} diff --git a/src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java b/src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java deleted file mode 100644 index 01019618..00000000 --- a/src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java +++ /dev/null @@ -1,152 +0,0 @@ -package com.lucidworks.spark.util; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.common.cloud.ZkConfigManager; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.CoreDescriptor; -import org.apache.solr.core.SolrCore; -import org.apache.solr.core.SolrResourceLoader; - -import org.apache.log4j.Logger; - -import org.apache.commons.io.FileUtils; - -/** - * Supports one or more embedded Solr servers in the same JVM - */ -public class EmbeddedSolrServerFactory implements Serializable { - - private static final Logger log = Logger.getLogger(EmbeddedSolrServerFactory.class); - - public static final EmbeddedSolrServerFactory singleton = new EmbeddedSolrServerFactory(); - - private transient Map servers = new HashMap(); - - public synchronized EmbeddedSolrServer getEmbeddedSolrServer(String zkHost, String collection) { - return getEmbeddedSolrServer(zkHost, collection, null, null); - } - - public synchronized EmbeddedSolrServer getEmbeddedSolrServer(String zkHost, String collection, String solrConfigXml, String solrXml) { - String key = zkHost+"/"+collection; - - EmbeddedSolrServer solr = servers.get(key); - if (solr == null) { - try { - solr = bootstrapEmbeddedSolrServer(zkHost, collection, solrConfigXml, solrXml); - } catch (Exception exc) { - if (exc instanceof RuntimeException) { - throw (RuntimeException) exc; - } else { - throw new RuntimeException(exc); - } - } - servers.put(key, solr); - } - return solr; - } - - private EmbeddedSolrServer bootstrapEmbeddedSolrServer(String zkHost, String collection, String solrConfigXml, String solrXml) throws Exception { - - CloudSolrClient cloudClient = SolrSupport.getCachedCloudClient(zkHost); - cloudClient.connect(); - - ZkStateReader zkStateReader = cloudClient.getZkStateReader(); - if (!zkStateReader.getClusterState().hasCollection(collection)) - throw new IllegalStateException("Collection '"+collection+"' not found!"); - - String configName = zkStateReader.readConfigName(collection); - if (configName == null) - throw new IllegalStateException("No configName found for Collection: "+collection); - - File tmpDir = FileUtils.getTempDirectory(); - File solrHomeDir = new File(tmpDir, "solr"+System.currentTimeMillis()); - - log.info("Setting up embedded Solr server in local directory: "+solrHomeDir.getAbsolutePath()); - - FileUtils.forceMkdir(solrHomeDir); - - writeSolrXml(solrHomeDir, solrXml); - - String coreName = "embedded"; - - File instanceDir = new File(solrHomeDir, coreName); - FileUtils.forceMkdir(instanceDir); - - File confDir = new File(instanceDir, "conf"); - ZkConfigManager zkConfigManager = - new ZkConfigManager(cloudClient.getZkStateReader().getZkClient()); - zkConfigManager.downloadConfigDir(configName, confDir.toPath()); - if (!confDir.isDirectory()) - throw new IOException("Failed to download /configs/"+configName+" from ZooKeeper!"); - - writeSolrConfigXml(confDir, solrConfigXml); - - log.info(String.format("Attempting to bootstrap EmbeddedSolrServer instance in dir: %s", - instanceDir.getAbsolutePath())); - - SolrResourceLoader solrResourceLoader = - new SolrResourceLoader(solrHomeDir.toPath()); - CoreContainer coreContainer = new CoreContainer(solrResourceLoader); - coreContainer.load(); - - SolrCore core = coreContainer.create(coreName, instanceDir.toPath(), Collections.emptyMap(), false); - return new EmbeddedSolrServer(coreContainer, coreName); - } - - protected File writeSolrConfigXml(File confDir, String solrConfigXml) throws IOException { - if (solrConfigXml != null && !solrConfigXml.trim().isEmpty()) { - return writeClasspathResourceToLocalFile(solrConfigXml, new File(confDir, "solrconfig.xml")); - } else { - return writeClasspathResourceToLocalFile("embedded/solrconfig.xml", new File(confDir, "solrconfig.xml")); - } - } - - protected File writeSolrXml(File solrHomeDir, String solrXml) throws IOException { - if (solrXml != null && !solrXml.trim().isEmpty()) { - return writeClasspathResourceToLocalFile(solrXml, new File(solrHomeDir, "solr.xml")); - } else { - return writeClasspathResourceToLocalFile("embedded/solr.xml", new File(solrHomeDir, "solr.xml")); - } - } - - protected File writeClasspathResourceToLocalFile(String resourceId, File destFile) throws IOException { - InputStreamReader isr = null; - OutputStreamWriter osw = null; - int r = 0; - char[] ach = new char[1024]; - try { - InputStream in = getClass().getClassLoader().getResourceAsStream(resourceId); - if (in == null) - throw new IOException("Resource "+resourceId+" not found on classpath!"); - - isr = new InputStreamReader(in, StandardCharsets.UTF_8); - osw = new OutputStreamWriter(new FileOutputStream(destFile), StandardCharsets.UTF_8); - while ((r = isr.read(ach)) != -1) osw.write(ach, 0, r); - osw.flush(); - } finally { - if (isr != null) { - try { - isr.close(); - } catch (Exception ignoreMe){ - ignoreMe.printStackTrace(); - } - } - if (osw != null) { - try { - osw.close(); - } catch (Exception ignoreMe){ - ignoreMe.printStackTrace(); - } - } - } - return destFile; - } -} diff --git a/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala b/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala index eeb7e80c..8ab15466 100644 --- a/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala +++ b/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala @@ -5,22 +5,18 @@ import java.lang.reflect.Modifier import java.net.{ConnectException, InetAddress, SocketException, URL} import java.nio.file.{Files, Paths} import java.util.Date -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import com.google.common.cache._ -import com.lucidworks.spark.filter.DocFilterContext import com.lucidworks.spark.fusion.FusionPipelineClient import com.lucidworks.spark.util.SolrSupport.{CloudClientParams, ShardInfo} import com.lucidworks.spark.{LazyLogging, SolrReplica, SolrShard, SparkSolrAccumulator} import org.apache.http.NoHttpResponseException import org.apache.solr.client.solrj.impl._ import org.apache.solr.client.solrj.request.UpdateRequest -import org.apache.solr.client.solrj.response.QueryResponse -import org.apache.solr.client.solrj.{SolrClient, SolrQuery, SolrServerException} +import org.apache.solr.client.solrj.{SolrClient, SolrQuery} import org.apache.solr.common.cloud._ -import org.apache.solr.common.{SolrDocument, SolrException, SolrInputDocument} +import org.apache.solr.common.{SolrException, SolrInputDocument} import org.apache.solr.common.params.ModifiableSolrParams import org.apache.solr.common.util.SimpleOrderedMap import org.apache.spark.rdd.RDD @@ -568,70 +564,6 @@ object SolrSupport extends LazyLogging { None } - def filterDocuments( - filterContext: DocFilterContext, - zkHost: String, - collection: String, - docs: DStream[SolrInputDocument]): DStream[SolrInputDocument] = { - val partitionIndex = new AtomicInteger(0) - val idFieldName = filterContext.getDocIdFieldName - - docs.mapPartitions(solrInputDocumentIterator => { - val startNano: Long = System.nanoTime() - val partitionId: Int = partitionIndex.incrementAndGet() - - val partitionFq: String = "docfilterid_i:" + partitionId - // TODO: Can this be used concurrently? probably better to have each partition check it out from a pool - val solr = EmbeddedSolrServerFactory.singleton.getEmbeddedSolrServer(zkHost, collection) - - // index all docs in this partition, then match queries - var numDocs: Int = 0 - val inputDocs: mutable.Map[String, SolrInputDocument] = new mutable.HashMap[String, SolrInputDocument] - while (solrInputDocumentIterator.hasNext) { - numDocs += 1 - val doc: SolrInputDocument = solrInputDocumentIterator.next() - doc.setField("docfilterid_i", partitionId) - solr.add(doc) - inputDocs.put(doc.getFieldValue(idFieldName).asInstanceOf[String], doc) - } - solr.commit - - for (q: SolrQuery <- filterContext.getQueries) { - val query = q.getCopy - query.setFields(idFieldName) - query.setRows(inputDocs.size) - query.addFilterQuery(partitionFq) - - var queryResponse: Option[QueryResponse] = None - try { - queryResponse = Some(solr.query(query)) - } - catch { - case e: SolrServerException => - throw new RuntimeException(e) - } - - if (queryResponse.isDefined) { - for (doc: SolrDocument <- queryResponse.get.getResults) { - val docId: String = doc.getFirstValue(idFieldName).asInstanceOf[String] - val inputDoc = inputDocs.get(docId) - if (inputDoc.isDefined) filterContext.onMatch(q, inputDoc.get) - } - - solr.deleteByQuery(partitionFq, 100) - val durationNano: Long = System.nanoTime - startNano - - logger.debug("Partition " + partitionId + " took " + TimeUnit.MILLISECONDS.convert(durationNano, TimeUnit.NANOSECONDS) + "ms to process " + numDocs + " docs") - for (inputDoc <- inputDocs.values) { - inputDoc.removeField("docfilterid_i") - } - } - } - - inputDocs.valuesIterator - }) - } - def buildShardList(zkHost: String, collection: String, shardsTolerant: Boolean): List[SolrShard] = { val solrClient = getCachedCloudClient(zkHost) val zkStateReader: ZkStateReader = solrClient.getZkStateReader diff --git a/src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java b/src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java deleted file mode 100644 index cd052f14..00000000 --- a/src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.lucidworks.spark.solr; - -import com.lucidworks.spark.RDDProcessorTestBase; -import com.lucidworks.spark.util.EmbeddedSolrServerFactory; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; -import org.apache.solr.client.solrj.response.QueryResponse; -import org.junit.Test; - -public class TestEmbeddedSolrServer extends RDDProcessorTestBase { - - @Test - public void testEmbeddedSolrServer() throws Exception { - String testCollection = "testEmbeddedSolrServer"; - EmbeddedSolrServer embeddedSolrServer = null; - try { - String zkHost = cluster.getZkServer().getZkAddress(); - buildCollection(zkHost, testCollection, 10, 1); - embeddedSolrServer = EmbeddedSolrServerFactory.singleton.getEmbeddedSolrServer(zkHost, testCollection); - QueryResponse queryResponse = embeddedSolrServer.query(new SolrQuery("*:*")); - assert(queryResponse.getStatus() == 0); - } finally { - if (embeddedSolrServer != null) { - embeddedSolrServer.close(); - } - deleteCollection(testCollection); - } - } - - @Test - public void testEmbeddedSolrServerCustomConfig() throws Exception { - String testCollection = "testEmbeddedSolrServerConfig"; - EmbeddedSolrServer embeddedSolrServer = null; - try { - String zkHost = cluster.getZkServer().getZkAddress(); - buildCollection(zkHost, testCollection, 10, 1); - embeddedSolrServer = EmbeddedSolrServerFactory.singleton.getEmbeddedSolrServer(zkHost, testCollection, "custom-solrconfig.xml", null); - QueryResponse queryResponse = embeddedSolrServer.query(new SolrQuery("*:*")); - assert(queryResponse.getStatus() == 0); - } finally { - if (embeddedSolrServer != null) { - embeddedSolrServer.close(); - } - deleteCollection(testCollection); - } - } -} diff --git a/src/test/resources/custom-solrconfig.xml b/src/test/resources/custom-solrconfig.xml deleted file mode 100644 index d282e7e1..00000000 --- a/src/test/resources/custom-solrconfig.xml +++ /dev/null @@ -1,185 +0,0 @@ - - - 7.5.0 - ${solr.data.dir:} - - - - single - false - - - - ${solr.ulog.dir:} - ${solr.ulog.numVersionBuckets:65536} - - - ${solr.autoCommit.maxTime:15000} - false - - - -1 - - - - 1024 - - - - - - - true - 20 - 100 - - - - - - - - - - - - true - - - - - - - - - 5000 - tagger_text - LONGEST_DOMINANT_RIGHT - json - - - - - - explicit - 10 - - - - - - explicit - json - true - - - - - - _text_ - - - - - - string - - - - - - explicit - - - elevator - - - - - - - [^\w-\.] - _ - - - - - - - yyyy-MM-dd'T'HH:mm:ss.SSSZ - yyyy-MM-dd'T'HH:mm:ss,SSSZ - yyyy-MM-dd'T'HH:mm:ss.SSS - yyyy-MM-dd'T'HH:mm:ss,SSS - yyyy-MM-dd'T'HH:mm:ssZ - yyyy-MM-dd'T'HH:mm:ss - yyyy-MM-dd'T'HH:mmZ - yyyy-MM-dd'T'HH:mm - yyyy-MM-dd HH:mm:ss.SSSZ - yyyy-MM-dd HH:mm:ss,SSSZ - yyyy-MM-dd HH:mm:ss.SSS - yyyy-MM-dd HH:mm:ss,SSS - yyyy-MM-dd HH:mm:ssZ - yyyy-MM-dd HH:mm:ss - yyyy-MM-dd HH:mmZ - yyyy-MM-dd HH:mm - yyyy-MM-dd - - - - - java.lang.String - string - true - - - java.lang.Boolean - booleans - - - java.util.Date - pdates - - - java.lang.Long - java.lang.Integer - plongs - - - java.lang.Number - pdoubles - - - - - - - - - - - - text/plain; charset=UTF-8 - - From d823ba252674dbd7797c2811d640b1c8540d7e5f Mon Sep 17 00:00:00 2001 From: kiranchitturi Date: Mon, 22 Jun 2020 15:10:37 -0700 Subject: [PATCH 8/9] downgrade maven plugin for CI --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d88d09d1..f60cfde2 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ net.alchim31.maven scala-maven-plugin - 4.4.0 + 4.0.0 eclipse-add-source From 15ec64a68069e1e181d1d6a57e970da2688e8fc4 Mon Sep 17 00:00:00 2001 From: kiranchitturi Date: Mon, 22 Jun 2020 15:11:42 -0700 Subject: [PATCH 9/9] Revert "downgrade maven plugin for CI" This reverts commit d823ba252674dbd7797c2811d640b1c8540d7e5f. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f60cfde2..d88d09d1 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ net.alchim31.maven scala-maven-plugin - 4.0.0 + 4.4.0 eclipse-add-source