diff --git a/pom.xml b/pom.xml
index 213d55b7..d88d09d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.lucidworks.spark
spark-solr
- 3.9.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
jar
spark-solr
Tools for reading data from Spark into Solr
@@ -36,14 +36,14 @@
UTF-8
- 1.8
- 2.4.5
+ 11
+ 3.0.0
8.4.1
2.7.5
- 2.10.1
- 2.11.12
- 2.11
- 1.1.1
+ 2.11.0
+ 2.12.10
+ 2.12
+ 1.4.1
128m
@@ -78,7 +78,7 @@
org.apache.maven.plugins
maven-javadoc-plugin
- 2.10.3
+ 3.2.0
attach-javadocs
@@ -98,7 +98,7 @@
verify
sign
-
+
--pinentry-mode
@@ -121,7 +121,7 @@
org.jacoco
jacoco-maven-plugin
- 0.7.5.201505241946
+ 0.8.5
com/lucidworks/spark/example/**
@@ -156,9 +156,6 @@
org.apache.maven.plugins
maven-surefire-plugin
2.18.1
-
- ${argLine} -XX:MaxPermSize=${MaxPermSize}
-
@@ -169,7 +166,7 @@
net.alchim31.maven
scala-maven-plugin
- 3.2.2
+ 4.4.0
eclipse-add-source
@@ -213,7 +210,7 @@
maven-compiler-plugin
- 3.0
+ 3.8.1
${java.version}
@@ -222,16 +219,13 @@
org.codehaus.mojo
versions-maven-plugin
- 2.3
+ 2.7
org.apache.maven.plugins
maven-surefire-plugin
- 2.18.1
-
- -XX:MaxPermSize=${MaxPermSize}
-
+ 2.22.2
@@ -262,7 +256,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.1.0
+ 3.2.4
package
@@ -372,7 +366,7 @@
org.apache.http
shaded.apache.http
-
+
@@ -446,7 +440,7 @@
com.esotericsoftware
kryo-shaded
- 3.0.3
+ 4.0.2
compile
@@ -460,20 +454,15 @@
spark-mllib_${scala.binary.version}
${spark.version}
-
- org.apache.bahir
- spark-streaming-twitter_${scala.binary.version}
- 2.0.1
-
commons-cli
commons-cli
- 1.3.1
+ 1.4
commons-io
commons-io
- 2.5
+ 2.7
org.apache.solr
@@ -594,12 +583,12 @@
org.apache.commons
commons-lang3
- 3.5
+ 3.9
junit
junit
- 4.12
+ 4.13
test
@@ -639,7 +628,7 @@
org.scalatest
scalatest_${scala.binary.version}
- 3.0.3
+ 3.0.8
test
@@ -650,19 +639,20 @@
org.apache.commons
commons-compress
- 1.19
+ 1.20
-
+
+
- com.github.tomakehurst
- wiremock
- 1.56
-
- standalone
+ com.github.tomakehurst
+ wiremock-standalone
+ 2.26.3
org.mortbay.jetty
diff --git a/src/main/java/com/lucidworks/spark/SparkApp.java b/src/main/java/com/lucidworks/spark/SparkApp.java
index 1d76c7de..5a595118 100644
--- a/src/main/java/com/lucidworks/spark/SparkApp.java
+++ b/src/main/java/com/lucidworks/spark/SparkApp.java
@@ -23,8 +23,6 @@
import com.lucidworks.spark.example.hadoop.Logs2SolrRDDProcessor;
import com.lucidworks.spark.example.query.KMeansAnomaly;
import com.lucidworks.spark.example.query.*;
-import com.lucidworks.spark.example.streaming.DocumentFilteringStreamProcessor;
-import com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -241,14 +239,10 @@ private static RDDProcessor newProcessor(String streamProcType) throws Exception
streamProcType = streamProcType.trim();
- if ("twitter-to-solr".equals(streamProcType))
- return new TwitterToSolrStreamProcessor();
- else if ("word-count".equals(streamProcType))
+ if ("word-count".equals(streamProcType))
return new WordCount();
else if ("term-vectors".equals(streamProcType))
return new ReadTermVectors();
- else if ("docfilter".equals(streamProcType))
- return new DocumentFilteringStreamProcessor();
else if ("hdfs-to-solr".equals(streamProcType))
return new HdfsToSolrRDDProcessor();
else if ("logs2solr".equals(streamProcType))
@@ -278,10 +272,8 @@ else if ("eventsim".equals(streamProcType))
private static void displayProcessorOptions(PrintStream out) throws Exception {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("twitter-to-solr", getProcessorOptions(new TwitterToSolrStreamProcessor()));
formatter.printHelp("word-count", getProcessorOptions(new WordCount()));
formatter.printHelp("term-vectors", getProcessorOptions(new ReadTermVectors()));
- formatter.printHelp("docfilter", getProcessorOptions(new DocumentFilteringStreamProcessor()));
formatter.printHelp("hdfs-to-solr", getProcessorOptions(new HdfsToSolrRDDProcessor()));
formatter.printHelp("logs2solr", getProcessorOptions(new Logs2SolrRDDProcessor()));
formatter.printHelp("query-solr-benchmark", getProcessorOptions(new QueryBenchmark()));
diff --git a/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java b/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java
index 36b084be..e87623be 100644
--- a/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java
+++ b/src/main/java/com/lucidworks/spark/example/ml/MLPipeline.java
@@ -218,8 +218,8 @@ public int run(SparkConf conf, CommandLine cli) throws Exception {
System.out.println(confusionMatrix);
// compute the false positive rate per label
- System.out.println();
- System.out.println("F-Measure: "+metrics.fMeasure());
+
+ System.out.println("Accuracy: "+metrics.accuracy());
System.out.println("label\tfpr\n");
String[] labels = labelConverter.getLabels();
diff --git a/src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java b/src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java
deleted file mode 100644
index b3ea50e7..00000000
--- a/src/main/java/com/lucidworks/spark/example/streaming/DocumentFilteringStreamProcessor.java
+++ /dev/null
@@ -1,147 +0,0 @@
-package com.lucidworks.spark.example.streaming;
-
-import com.lucidworks.spark.util.SolrSupport;
-import com.lucidworks.spark.SparkApp;
-import com.lucidworks.spark.filter.DocFilterContext;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.DStream;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import twitter4j.Status;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Example showing how to match documents against a set of known queries; useful
- * for doing things like alerts, etc.
- */
-public class DocumentFilteringStreamProcessor extends SparkApp.StreamProcessor {
-
- public static Logger log = Logger.getLogger(DocumentFilteringStreamProcessor.class);
-
- /**
- * A DocFilterContext is responsible for loading queries from some external system and
- * then doing something with each doc that is matched to a query.
- */
- class ExampleDocFilterContextImpl implements DocFilterContext {
-
- public void init(JavaStreamingContext jssc, CommandLine cli) {
- // nothing to init for this basic impl
- }
-
- public String getDocIdFieldName() { return "id"; }
-
- public List getQueries() {
- List queryList = new ArrayList();
-
- // a real impl would pull queries from an external system, such as Solr or a DB or a file
- SolrQuery q1 = new SolrQuery("type_s:post");
- q1.setParam("_qid_", "POSTS"); // identify the query when tagging matching docs
- queryList.add(q1);
-
- SolrQuery q2 = new SolrQuery("type_s:echo");
- q2.setParam("_qid_", "ECHOS");
- queryList.add(q2);
-
- return queryList;
- }
-
- public void onMatch(SolrQuery query, SolrInputDocument inputDoc) {
- String[] qids = query.getParams("_qid_");
- if (qids == null || qids.length < 1) return; // not one of ours
-
- if (log.isDebugEnabled())
- log.debug("document [" + inputDoc.getFieldValue("id") + "] matches query: " + qids[0]);
-
- // just index the matching query for later analysis
- inputDoc.addField("_qid_ss", qids[0]);
- }
- }
-
- public String getName() { return "docfilter"; }
-
- @Override
- public void setup(JavaStreamingContext jssc, CommandLine cli) throws Exception {
-
- // load the DocFilterContext implementation, which knows how to load queries
- DocFilterContext docFilterContext = loadDocFilterContext(jssc, cli);
- final String idFieldName = docFilterContext.getDocIdFieldName();
-
- // start receiving a stream of tweets ...
- String filtersArg = cli.getOptionValue("tweetFilters");
- String[] filters = (filtersArg != null) ? filtersArg.split(",") : new String[0];
- JavaReceiverInputDStream tweets = TwitterUtils.createStream(jssc, null, filters);
-
- // map incoming tweets into SolrInputDocument objects for indexing in Solr
- JavaDStream docs = tweets.map(
- new Function() {
- public SolrInputDocument call(Status status) {
- SolrInputDocument doc =
- SolrSupport.autoMapToSolrInputDoc(idFieldName, "tweet-"+status.getId(), status, null);
- doc.setField("provider_s", "twitter");
- doc.setField("author_s", status.getUser().getScreenName());
- doc.setField("type_s", status.isRetweet() ? "echo" : "post");
- return doc;
- }
- }
- );
-
- // run each doc through a list of filters pulled from our DocFilterContext
- String filterCollection = cli.getOptionValue("filterCollection", collection);
- DStream enriched =
- SolrSupport.filterDocuments(docFilterContext, zkHost, filterCollection, docs.dstream());
-
- // now index the enriched docs into Solr (or do whatever after the matching process runs)
- SolrSupport.indexDStreamOfDocs(zkHost, collection, batchSize, enriched);
- }
-
- protected DocFilterContext loadDocFilterContext(JavaStreamingContext jssc, CommandLine cli)
- throws Exception
- {
- DocFilterContext ctxt = null;
- String docFilterContextImplClass = cli.getOptionValue("docFilterContextImplClass");
- if (docFilterContextImplClass != null) {
- Class implClass =
- (Class)getClass().getClassLoader().loadClass(docFilterContextImplClass);
- ctxt = implClass.newInstance();
- } else {
- ctxt = new ExampleDocFilterContextImpl();
- }
- ctxt.init(jssc, cli);
- return ctxt;
- }
-
- public Option[] getOptions() {
- return new Option[]{
- OptionBuilder
- .withArgName("LIST")
- .hasArg()
- .isRequired(false)
- .withDescription("List of Twitter keywords to filter on, separated by commas")
- .create("tweetFilters"),
- OptionBuilder
- .withArgName("NAME")
- .hasArg()
- .isRequired(false)
- .withDescription("Collection to pull configuration files to create an " +
- "EmbeddedSolrServer for document matching; defaults to the value of the collection option.")
- .create("filterCollection"),
- OptionBuilder
- .withArgName("CLASS")
- .hasArg()
- .isRequired(false)
- .withDescription("Name of the DocFilterContext implementation class; defaults to an internal example impl: "+
- ExampleDocFilterContextImpl.class.getName())
- .create("docFilterContextImplClass")
- };
- }
-}
diff --git a/src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java b/src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java
deleted file mode 100644
index b2005f3a..00000000
--- a/src/main/java/com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package com.lucidworks.spark.example.streaming;
-
-import com.lucidworks.spark.SparkApp;
-import com.lucidworks.spark.util.SolrSupport;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.log4j.Logger;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.twitter.TwitterUtils;
-import twitter4j.Status;
-
-/**
- * Simple example of indexing tweets into Solr using Spark streaming; be sure to update the
- * twitter4j.properties file on the classpath with your Twitter API credentials.
- */
-public class TwitterToSolrStreamProcessor extends SparkApp.StreamProcessor {
-
- public static Logger log = Logger.getLogger(TwitterToSolrStreamProcessor.class);
-
- public String getName() { return "twitter-to-solr"; }
-
- /**
- * Sends a stream of tweets to Solr.
- */
- @Override
- public void setup(JavaStreamingContext jssc, CommandLine cli) throws Exception {
- String filtersArg = cli.getOptionValue("tweetFilters");
- String[] filters = (filtersArg != null) ? filtersArg.split(",") : new String[0];
-
- // start receiving a stream of tweets ...
- JavaReceiverInputDStream tweets =
- TwitterUtils.createStream(jssc, null, filters);
-
- String fusionUrl = cli.getOptionValue("fusion");
- if (fusionUrl != null) {
- // just send JSON directly to Fusion
- SolrSupport.sendDStreamOfDocsToFusion(fusionUrl, cli.getOptionValue("fusionCredentials"), tweets.dstream(), batchSize);
- } else {
- // map incoming tweets into PipelineDocument objects for indexing in Solr
- JavaDStream docs = tweets.map(
- new Function() {
-
- /**
- * Convert a twitter4j Status object into a SolrJ SolrInputDocument
- */
- public SolrInputDocument call(Status status) {
-
- if (log.isDebugEnabled()) {
- log.debug("Received tweet: " + status.getId() + ": " + status.getText().replaceAll("\\s+", " "));
- }
-
- // simple mapping from primitives to dynamic Solr fields using reflection
- SolrInputDocument doc =
- SolrSupport.autoMapToSolrInputDoc("tweet-"+status.getId(), status, null);
- doc.setField("provider_s", "twitter");
- doc.setField("author_s", status.getUser().getScreenName());
- doc.setField("type_s", status.isRetweet() ? "echo" : "post");
-
- if (log.isDebugEnabled())
- log.debug("Transformed document: " + doc.toString());
- return doc;
- }
- }
- );
-
- // when ready, send the docs into a SolrCloud cluster
- SolrSupport.indexDStreamOfDocs(zkHost, collection, batchSize, docs.dstream());
- }
- }
-
- public Option[] getOptions() {
- return new Option[]{
- OptionBuilder
- .withArgName("LIST")
- .hasArg()
- .isRequired(false)
- .withDescription("List of Twitter keywords to filter on, separated by commas")
- .create("tweetFilters"),
- OptionBuilder
- .withArgName("URL(s)")
- .hasArg()
- .isRequired(false)
- .withDescription("Fusion endpoint")
- .create("fusion"),
- OptionBuilder
- .withArgName("user:password:realm")
- .hasArg()
- .isRequired(false)
- .withDescription("Fusion credentials user:password:realm")
- .create("fusionCredentials")
- };
- }
-}
diff --git a/src/main/java/com/lucidworks/spark/filter/DocFilterContext.java b/src/main/java/com/lucidworks/spark/filter/DocFilterContext.java
deleted file mode 100644
index 02275ccd..00000000
--- a/src/main/java/com/lucidworks/spark/filter/DocFilterContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.lucidworks.spark.filter;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Used by the document filtering framework to delegate the loading and
- * of queries used to filter documents and what to do when a document
- * matches a query.
- */
-public interface DocFilterContext extends Serializable {
- void init(JavaStreamingContext jssc, CommandLine cli);
- String getDocIdFieldName();
- List getQueries();
- void onMatch(SolrQuery query, SolrInputDocument inputDoc);
-}
diff --git a/src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java b/src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java
deleted file mode 100644
index 01019618..00000000
--- a/src/main/java/com/lucidworks/spark/util/EmbeddedSolrServerFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package com.lucidworks.spark.util;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrResourceLoader;
-
-import org.apache.log4j.Logger;
-
-import org.apache.commons.io.FileUtils;
-
-/**
- * Supports one or more embedded Solr servers in the same JVM
- */
-public class EmbeddedSolrServerFactory implements Serializable {
-
- private static final Logger log = Logger.getLogger(EmbeddedSolrServerFactory.class);
-
- public static final EmbeddedSolrServerFactory singleton = new EmbeddedSolrServerFactory();
-
- private transient Map servers = new HashMap();
-
- public synchronized EmbeddedSolrServer getEmbeddedSolrServer(String zkHost, String collection) {
- return getEmbeddedSolrServer(zkHost, collection, null, null);
- }
-
- public synchronized EmbeddedSolrServer getEmbeddedSolrServer(String zkHost, String collection, String solrConfigXml, String solrXml) {
- String key = zkHost+"/"+collection;
-
- EmbeddedSolrServer solr = servers.get(key);
- if (solr == null) {
- try {
- solr = bootstrapEmbeddedSolrServer(zkHost, collection, solrConfigXml, solrXml);
- } catch (Exception exc) {
- if (exc instanceof RuntimeException) {
- throw (RuntimeException) exc;
- } else {
- throw new RuntimeException(exc);
- }
- }
- servers.put(key, solr);
- }
- return solr;
- }
-
- private EmbeddedSolrServer bootstrapEmbeddedSolrServer(String zkHost, String collection, String solrConfigXml, String solrXml) throws Exception {
-
- CloudSolrClient cloudClient = SolrSupport.getCachedCloudClient(zkHost);
- cloudClient.connect();
-
- ZkStateReader zkStateReader = cloudClient.getZkStateReader();
- if (!zkStateReader.getClusterState().hasCollection(collection))
- throw new IllegalStateException("Collection '"+collection+"' not found!");
-
- String configName = zkStateReader.readConfigName(collection);
- if (configName == null)
- throw new IllegalStateException("No configName found for Collection: "+collection);
-
- File tmpDir = FileUtils.getTempDirectory();
- File solrHomeDir = new File(tmpDir, "solr"+System.currentTimeMillis());
-
- log.info("Setting up embedded Solr server in local directory: "+solrHomeDir.getAbsolutePath());
-
- FileUtils.forceMkdir(solrHomeDir);
-
- writeSolrXml(solrHomeDir, solrXml);
-
- String coreName = "embedded";
-
- File instanceDir = new File(solrHomeDir, coreName);
- FileUtils.forceMkdir(instanceDir);
-
- File confDir = new File(instanceDir, "conf");
- ZkConfigManager zkConfigManager =
- new ZkConfigManager(cloudClient.getZkStateReader().getZkClient());
- zkConfigManager.downloadConfigDir(configName, confDir.toPath());
- if (!confDir.isDirectory())
- throw new IOException("Failed to download /configs/"+configName+" from ZooKeeper!");
-
- writeSolrConfigXml(confDir, solrConfigXml);
-
- log.info(String.format("Attempting to bootstrap EmbeddedSolrServer instance in dir: %s",
- instanceDir.getAbsolutePath()));
-
- SolrResourceLoader solrResourceLoader =
- new SolrResourceLoader(solrHomeDir.toPath());
- CoreContainer coreContainer = new CoreContainer(solrResourceLoader);
- coreContainer.load();
-
- SolrCore core = coreContainer.create(coreName, instanceDir.toPath(), Collections.emptyMap(), false);
- return new EmbeddedSolrServer(coreContainer, coreName);
- }
-
- protected File writeSolrConfigXml(File confDir, String solrConfigXml) throws IOException {
- if (solrConfigXml != null && !solrConfigXml.trim().isEmpty()) {
- return writeClasspathResourceToLocalFile(solrConfigXml, new File(confDir, "solrconfig.xml"));
- } else {
- return writeClasspathResourceToLocalFile("embedded/solrconfig.xml", new File(confDir, "solrconfig.xml"));
- }
- }
-
- protected File writeSolrXml(File solrHomeDir, String solrXml) throws IOException {
- if (solrXml != null && !solrXml.trim().isEmpty()) {
- return writeClasspathResourceToLocalFile(solrXml, new File(solrHomeDir, "solr.xml"));
- } else {
- return writeClasspathResourceToLocalFile("embedded/solr.xml", new File(solrHomeDir, "solr.xml"));
- }
- }
-
- protected File writeClasspathResourceToLocalFile(String resourceId, File destFile) throws IOException {
- InputStreamReader isr = null;
- OutputStreamWriter osw = null;
- int r = 0;
- char[] ach = new char[1024];
- try {
- InputStream in = getClass().getClassLoader().getResourceAsStream(resourceId);
- if (in == null)
- throw new IOException("Resource "+resourceId+" not found on classpath!");
-
- isr = new InputStreamReader(in, StandardCharsets.UTF_8);
- osw = new OutputStreamWriter(new FileOutputStream(destFile), StandardCharsets.UTF_8);
- while ((r = isr.read(ach)) != -1) osw.write(ach, 0, r);
- osw.flush();
- } finally {
- if (isr != null) {
- try {
- isr.close();
- } catch (Exception ignoreMe){
- ignoreMe.printStackTrace();
- }
- }
- if (osw != null) {
- try {
- osw.close();
- } catch (Exception ignoreMe){
- ignoreMe.printStackTrace();
- }
- }
- }
- return destFile;
- }
-}
diff --git a/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala b/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala
index 202dee8b..6e190ffa 100644
--- a/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala
+++ b/src/main/scala/com/lucidworks/spark/example/events/EventsimIndexer.scala
@@ -7,7 +7,7 @@ import com.lucidworks.spark.SparkApp.RDDProcessor
import com.lucidworks.spark.fusion.FusionPipelineClient
import org.apache.commons.cli.{CommandLine, Option}
import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.JavaConversions.bufferAsJavaList
import scala.collection.mutable.ListBuffer
@@ -70,7 +70,9 @@ class EventsimIndexer extends RDDProcessor {
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
- sparkSession.read.json(cli.getOptionValue("eventsimJson")).foreachPartition(rows => {
+ val df : DataFrame = sparkSession.read.json(cli.getOptionValue("eventsimJson"))
+
+ df.foreachPartition((rows : Iterator[Row]) => {
val fusion: FusionPipelineClient =
if (fusionAuthEnabled) new FusionPipelineClient(fusionEndpoints, fusionUser, fusionPass, fusionRealm)
else new FusionPipelineClient(fusionEndpoints)
diff --git a/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala b/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala
index 453f9744..51e74ae1 100644
--- a/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala
+++ b/src/main/scala/com/lucidworks/spark/example/ml/MLPipelineScala.scala
@@ -156,8 +156,9 @@ class MLPipelineScala extends SparkApp.RDDProcessor {
|${metrics.confusionMatrix}\n""".stripMargin)
// compute the false positive rate per label
- println(s"""\nF-Measure: ${metrics.fMeasure}
- |label\tfpr\n""".stripMargin)
+ // TODO: Spark3
+ //println(s"""\nF-Measure: ${metrics.fMeasure}
+ // |label\tfpr\n""".stripMargin)
val labels = labelConverter.getLabels
for (i <- labels.indices)
println(s"${labels(i)}\t${metrics.falsePositiveRate(i.toDouble)}")
diff --git a/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala b/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala
index 326134b3..7e1483c4 100644
--- a/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala
+++ b/src/main/scala/com/lucidworks/spark/rdd/SelectSolrRDD.scala
@@ -54,9 +54,10 @@ class SelectSolrRDD(
query.setRequestHandler(solrRequestHandler)
logger.debug(s"Using cursorMarks to fetch documents from ${partition.preferredReplica} for query: ${partition.query}")
val resultsIterator = new StreamingResultsIterator(SolrSupport.getCachedHttpSolrClient(url, zkHost), partition.query, partition.cursorMark)
- context.addTaskCompletionListener { (context) =>
- logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}")
- }
+ // TODO: Spark3
+ //context.addTaskCompletionListener { (context: TaskContext) =>
+ // logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}")
+ //}
resultsIterator
}
case p: SolrLimitPartition => {
@@ -69,9 +70,10 @@ class SelectSolrRDD(
query.setStart(null) // important! must start as null else the Iterator will advance the start position by the row size
val resultsIterator = new StreamingResultsIterator(SolrSupport.getCachedCloudClient(p.zkhost), query)
resultsIterator.setMaxSampleDocs(p.maxRows)
- context.addTaskCompletionListener { (context) =>
- logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from the limit (${p.maxRows}) partition of ${p.collection}")
- }
+ // TODO: Spark3
+ //context.addTaskCompletionListener { (context: TaskContext) =>
+ // logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from the limit (${p.maxRows}) partition of ${p.collection}")
+ //}
resultsIterator
}
case partition: AnyRef => throw new Exception("Unknown partition type '" + partition.getClass)
diff --git a/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala b/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala
index 7dcd78f3..baa839e0 100644
--- a/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala
+++ b/src/main/scala/com/lucidworks/spark/rdd/StreamingSolrRDD.scala
@@ -90,9 +90,10 @@ class StreamingSolrRDD(
query.setRequestHandler(solrRequestHandler)
logger.debug(s"Using export handler to fetch documents from ${partition.preferredReplica} for query: ${partition.query}")
val resultsIterator = getExportHandlerBasedIterator(url, query, partition.numWorkers, partition.workerId)
- context.addTaskCompletionListener { (context) =>
- logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}")
- }
+ // TODO: Spark3
+ //context.addTaskCompletionListener { (context) =>
+ // logger.info(f"Fetched ${resultsIterator.getNumDocs} rows from shard $url for partition ${split.index}")
+ //}
resultsIterator
case partition: AnyRef => throw new Exception("Unknown partition type '" + partition.getClass)
}
diff --git a/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala b/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala
index eeb7e80c..8ab15466 100644
--- a/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala
+++ b/src/main/scala/com/lucidworks/spark/util/SolrSupport.scala
@@ -5,22 +5,18 @@ import java.lang.reflect.Modifier
import java.net.{ConnectException, InetAddress, SocketException, URL}
import java.nio.file.{Files, Paths}
import java.util.Date
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import com.google.common.cache._
-import com.lucidworks.spark.filter.DocFilterContext
import com.lucidworks.spark.fusion.FusionPipelineClient
import com.lucidworks.spark.util.SolrSupport.{CloudClientParams, ShardInfo}
import com.lucidworks.spark.{LazyLogging, SolrReplica, SolrShard, SparkSolrAccumulator}
import org.apache.http.NoHttpResponseException
import org.apache.solr.client.solrj.impl._
import org.apache.solr.client.solrj.request.UpdateRequest
-import org.apache.solr.client.solrj.response.QueryResponse
-import org.apache.solr.client.solrj.{SolrClient, SolrQuery, SolrServerException}
+import org.apache.solr.client.solrj.{SolrClient, SolrQuery}
import org.apache.solr.common.cloud._
-import org.apache.solr.common.{SolrDocument, SolrException, SolrInputDocument}
+import org.apache.solr.common.{SolrException, SolrInputDocument}
import org.apache.solr.common.params.ModifiableSolrParams
import org.apache.solr.common.util.SimpleOrderedMap
import org.apache.spark.rdd.RDD
@@ -568,70 +564,6 @@ object SolrSupport extends LazyLogging {
None
}
- def filterDocuments(
- filterContext: DocFilterContext,
- zkHost: String,
- collection: String,
- docs: DStream[SolrInputDocument]): DStream[SolrInputDocument] = {
- val partitionIndex = new AtomicInteger(0)
- val idFieldName = filterContext.getDocIdFieldName
-
- docs.mapPartitions(solrInputDocumentIterator => {
- val startNano: Long = System.nanoTime()
- val partitionId: Int = partitionIndex.incrementAndGet()
-
- val partitionFq: String = "docfilterid_i:" + partitionId
- // TODO: Can this be used concurrently? probably better to have each partition check it out from a pool
- val solr = EmbeddedSolrServerFactory.singleton.getEmbeddedSolrServer(zkHost, collection)
-
- // index all docs in this partition, then match queries
- var numDocs: Int = 0
- val inputDocs: mutable.Map[String, SolrInputDocument] = new mutable.HashMap[String, SolrInputDocument]
- while (solrInputDocumentIterator.hasNext) {
- numDocs += 1
- val doc: SolrInputDocument = solrInputDocumentIterator.next()
- doc.setField("docfilterid_i", partitionId)
- solr.add(doc)
- inputDocs.put(doc.getFieldValue(idFieldName).asInstanceOf[String], doc)
- }
- solr.commit
-
- for (q: SolrQuery <- filterContext.getQueries) {
- val query = q.getCopy
- query.setFields(idFieldName)
- query.setRows(inputDocs.size)
- query.addFilterQuery(partitionFq)
-
- var queryResponse: Option[QueryResponse] = None
- try {
- queryResponse = Some(solr.query(query))
- }
- catch {
- case e: SolrServerException =>
- throw new RuntimeException(e)
- }
-
- if (queryResponse.isDefined) {
- for (doc: SolrDocument <- queryResponse.get.getResults) {
- val docId: String = doc.getFirstValue(idFieldName).asInstanceOf[String]
- val inputDoc = inputDocs.get(docId)
- if (inputDoc.isDefined) filterContext.onMatch(q, inputDoc.get)
- }
-
- solr.deleteByQuery(partitionFq, 100)
- val durationNano: Long = System.nanoTime - startNano
-
- logger.debug("Partition " + partitionId + " took " + TimeUnit.MILLISECONDS.convert(durationNano, TimeUnit.NANOSECONDS) + "ms to process " + numDocs + " docs")
- for (inputDoc <- inputDocs.values) {
- inputDoc.removeField("docfilterid_i")
- }
- }
- }
-
- inputDocs.valuesIterator
- })
- }
-
def buildShardList(zkHost: String, collection: String, shardsTolerant: Boolean): List[SolrShard] = {
val solrClient = getCachedCloudClient(zkHost)
val zkStateReader: ZkStateReader = solrClient.getZkStateReader
diff --git a/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java b/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java
index e76f5528..e9238047 100644
--- a/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java
+++ b/src/test/java/com/lucidworks/spark/fusion/FusionPipelineClientTest.java
@@ -135,15 +135,15 @@ public void testHappyPath() throws Exception {
if (useWireMockRule) {
// mock out the Pipeline API
// stubFor(post(urlEqualTo("/api/apollo/index-pipelines")).willReturn(aResponse().withStatus(200)));
- stubFor(post(urlEqualTo(fusionPipelineUrlWithoutHostAndPort)).willReturn(aResponse().withStatus(200)));
+ stubFor(post(urlEqualTo(fusionPipelineUrlWithoutHostAndPort+"?echo=false")).willReturn(aResponse().withStatus(200)));
stubFor(get(urlEqualTo(fusionProxyBaseUrl + fusionIndexingPipelineUrlExtension)).willReturn(aResponse().withStatus(200).withBody("hello")));
// a bad node in the mix ... to test FusionPipelineClient error handling
- stubFor(post(urlEqualTo(badPath)).willReturn(aResponse().withStatus(500)));
+ stubFor(post(urlEqualTo(badPath+"?echo=false")).willReturn(aResponse().withStatus(500)));
// another bad node in the mix which produces un-authorized errors... to test FusionPipelineClient error handling
- stubFor(post(urlEqualTo(unauthPath)).willReturn(aResponse().withStatus(401)));
+ stubFor(post(urlEqualTo(unauthPath+"?echo=false")).willReturn(aResponse().withStatus(401)));
// mock out the Session API
stubFor(post(urlEqualTo("/api/session?realmName=" + fusionRealm)).willReturn(aResponse().withStatus(200)));
diff --git a/src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java b/src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java
deleted file mode 100644
index cd052f14..00000000
--- a/src/test/java/com/lucidworks/spark/solr/TestEmbeddedSolrServer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.lucidworks.spark.solr;
-
-import com.lucidworks.spark.RDDProcessorTestBase;
-import com.lucidworks.spark.util.EmbeddedSolrServerFactory;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.junit.Test;
-
-public class TestEmbeddedSolrServer extends RDDProcessorTestBase {
-
- @Test
- public void testEmbeddedSolrServer() throws Exception {
- String testCollection = "testEmbeddedSolrServer";
- EmbeddedSolrServer embeddedSolrServer = null;
- try {
- String zkHost = cluster.getZkServer().getZkAddress();
- buildCollection(zkHost, testCollection, 10, 1);
- embeddedSolrServer = EmbeddedSolrServerFactory.singleton.getEmbeddedSolrServer(zkHost, testCollection);
- QueryResponse queryResponse = embeddedSolrServer.query(new SolrQuery("*:*"));
- assert(queryResponse.getStatus() == 0);
- } finally {
- if (embeddedSolrServer != null) {
- embeddedSolrServer.close();
- }
- deleteCollection(testCollection);
- }
- }
-
- @Test
- public void testEmbeddedSolrServerCustomConfig() throws Exception {
- String testCollection = "testEmbeddedSolrServerConfig";
- EmbeddedSolrServer embeddedSolrServer = null;
- try {
- String zkHost = cluster.getZkServer().getZkAddress();
- buildCollection(zkHost, testCollection, 10, 1);
- embeddedSolrServer = EmbeddedSolrServerFactory.singleton.getEmbeddedSolrServer(zkHost, testCollection, "custom-solrconfig.xml", null);
- QueryResponse queryResponse = embeddedSolrServer.query(new SolrQuery("*:*"));
- assert(queryResponse.getStatus() == 0);
- } finally {
- if (embeddedSolrServer != null) {
- embeddedSolrServer.close();
- }
- deleteCollection(testCollection);
- }
- }
-}
diff --git a/src/test/resources/conf/solrconfig.xml b/src/test/resources/conf/solrconfig.xml
index f6c8525d..b90d425b 100755
--- a/src/test/resources/conf/solrconfig.xml
+++ b/src/test/resources/conf/solrconfig.xml
@@ -1,6 +1,6 @@
- 6.4.2
+ LATEST
${solr.data.dir:}
diff --git a/src/test/resources/custom-solrconfig.xml b/src/test/resources/custom-solrconfig.xml
deleted file mode 100644
index d282e7e1..00000000
--- a/src/test/resources/custom-solrconfig.xml
+++ /dev/null
@@ -1,185 +0,0 @@
-
-
- 7.5.0
- ${solr.data.dir:}
-
-
-
- single
- false
-
-
-
- ${solr.ulog.dir:}
- ${solr.ulog.numVersionBuckets:65536}
-
-
- ${solr.autoCommit.maxTime:15000}
- false
-
-
- -1
-
-
-
- 1024
-
-
-
-
-
-
- true
- 20
- 100
-
-
-
-
-
-
-
-
-
-
-
- true
-
-
-
-
-
-
-
-
- 5000
- tagger_text
- LONGEST_DOMINANT_RIGHT
- json
-
-
-
-
-
- explicit
- 10
-
-
-
-
-
- explicit
- json
- true
-
-
-
-
-
- _text_
-
-
-
-
-
- string
-
-
-
-
-
- explicit
-
-
- elevator
-
-
-
-
-
-
- [^\w-\.]
- _
-
-
-
-
-
-
- yyyy-MM-dd'T'HH:mm:ss.SSSZ
- yyyy-MM-dd'T'HH:mm:ss,SSSZ
- yyyy-MM-dd'T'HH:mm:ss.SSS
- yyyy-MM-dd'T'HH:mm:ss,SSS
- yyyy-MM-dd'T'HH:mm:ssZ
- yyyy-MM-dd'T'HH:mm:ss
- yyyy-MM-dd'T'HH:mmZ
- yyyy-MM-dd'T'HH:mm
- yyyy-MM-dd HH:mm:ss.SSSZ
- yyyy-MM-dd HH:mm:ss,SSSZ
- yyyy-MM-dd HH:mm:ss.SSS
- yyyy-MM-dd HH:mm:ss,SSS
- yyyy-MM-dd HH:mm:ssZ
- yyyy-MM-dd HH:mm:ss
- yyyy-MM-dd HH:mmZ
- yyyy-MM-dd HH:mm
- yyyy-MM-dd
-
-
-
-
- java.lang.String
- string
- true
-
-
- java.lang.Boolean
- booleans
-
-
- java.util.Date
- pdates
-
-
- java.lang.Long
- java.lang.Integer
- plongs
-
-
- java.lang.Number
- pdoubles
-
-
-
-
-
-
-
-
-
-
-
- text/plain; charset=UTF-8
-
-
diff --git a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala
index 3080873d..b015796c 100644
--- a/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala
+++ b/src/test/scala/com/lucidworks/spark/TestPartitionByTimeQuerySupport.scala
@@ -5,7 +5,7 @@ import org.apache.solr.client.solrj.SolrQuery
import org.apache.spark.sql.SaveMode._
import com.lucidworks.spark.util.ConfigurationConstants._
import org.apache.solr.client.solrj.request.CollectionAdminRequest
-import org.apache.solr.client.solrj.request.CollectionAdminRequest.{CreateAlias, DeleteAlias}
+
/**
This class is used to test the PartitionByTimeQuerySupport class
*/
@@ -21,16 +21,16 @@ class TestPartitionByTimeQuerySupport extends TestSuiteBuilder {
SolrCloudUtil.buildCollection(zkHost, collection3Name, null, 1, cloudClient, sc)
try {
val jsonFileLocation = "src/test/resources/test-data/events.json"
- val jsonDF = sparkSession.read.json(jsonFileLocation)
+ val jsonDF = sparkSession.read.option("inferTimestamp", false).json(jsonFileLocation)
assert(jsonDF.count == 100)
- var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:30" && jsonDF("timestamp_tdt") < "2014-11-24T17:31")
+ var col1DF=jsonDF.filter(jsonDF("timestamp_tdt") >="2014-11-24T17:30" && jsonDF("timestamp_tdt") < "2014-11-24T17:31")
assert(col1DF.count == 32)
col1DF=col1DF.drop(col1DF("_version_"))
- var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:31" && jsonDF("timestamp_tdt") < "2014-11-24T17:32")
+ var col2DF=jsonDF.filter(jsonDF("timestamp_tdt") >="2014-11-24T17:31" && jsonDF("timestamp_tdt") < "2014-11-24T17:32")
assert(col2DF.count == 31)
col2DF=col2DF.drop(col2DF("_version_"))
- var col3DF=jsonDF.filter(jsonDF("timestamp_tdt") >= "2014-11-24T17:33" && jsonDF("timestamp_tdt") < "2014-11-24T17:34")
+ var col3DF=jsonDF.filter(jsonDF("timestamp_tdt") >="2014-11-24T17:33" && jsonDF("timestamp_tdt") < "2014-11-24T17:34")
assert(col3DF.count == 37)
col3DF=col3DF.drop(col3DF("_version_"))
diff --git a/src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala b/src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala
deleted file mode 100644
index d1f5c0a0..00000000
--- a/src/test/scala/com/lucidworks/spark/examples/TwitterTestSuite.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.lucidworks.spark.examples
-
-import com.lucidworks.spark.SparkSolrFunSuite
-import com.lucidworks.spark.util.SolrSupport
-import org.apache.solr.common.SolrInputDocument
-import twitter4j.TwitterObjectFactory
-
-class TwitterTestSuite extends SparkSolrFunSuite {
-
- test("Test twitter field object mapping") {
- val tweetJSON = "{ \"scopes\":{ \"place_ids\":[ \"place one\",\"place two\"]}, \"created_at\":\"Tue Mar 05 23:57:32 +0000 2013\", \"id\":309090333021581313, \"id_str\":\"309090333021581313\", \"text\":\"As announced, @anywhere has been retired per https:\\/\\/t.co\\/bWXjhurvwp The js file now logs a message to the console and exits quietly. ^ARK\", \"source\":\"web\", \"truncated\":false, \"in_reply_to_status_id\":null, \"in_reply_to_status_id_str\":null, \"in_reply_to_user_id\":null, \"in_reply_to_user_id_str\":null, \"in_reply_to_screen_name\":null, \"user\":{ \"id\":6253282, \"id_str\":\"6253282\", \"name\":\"Twitter API\", \"screen_name\":\"twitterapi\", \"location\":\"San Francisco, CA\", \"description\":\"The Real Twitter API. I tweet about API changes, service issues and happily answer questions about Twitter and our API. Don't get an answer? It's on my website.\", \"url\":\"http:\\/\\/dev.twitter.com\", \"entities\":{ \"url\":{ \"urls\":[ { \"url\":\"http:\\/\\/dev.twitter.com\", \"expanded_url\":null, \"indices\":[ 0, 22 ] } ] }, \"description\":{ \"urls\":[ ] } }, \"protected\":false, \"followers_count\":1533137, \"friends_count\":33, \"listed_count\":11369, \"created_at\":\"Wed May 23 06:01:13 +0000 2007\", \"favourites_count\":25, \"utc_offset\":-28800, \"time_zone\":\"Pacific Time (US & Canada)\", \"geo_enabled\":true, \"verified\":true, \"statuses_count\":3392, \"lang\":\"en\", \"contributors_enabled\":true, \"is_translator\":false, \"profile_background_color\":\"C0DEED\", \"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/656927849\\/miyt9dpjz77sc0w3d4vj.png\", \"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/656927849\\/miyt9dpjz77sc0w3d4vj.png\", \"profile_background_tile\":true, \"profile_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_images\\/2284174872\\/7df3h38zabcvjylnyfe3_normal.png\", \"profile_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_images\\/2284174872\\/7df3h38zabcvjylnyfe3_normal.png\", \"profile_banner_url\":\"https:\\/\\/si0.twimg.com\\/profile_banners\\/6253282\\/1347394302\", \"profile_link_color\":\"0084B4\", \"profile_sidebar_border_color\":\"C0DEED\", \"profile_sidebar_fill_color\":\"DDEEF6\", \"profile_text_color\":\"333333\", \"profile_use_background_image\":true, \"default_profile\":false, \"default_profile_image\":false, \"following\":null, \"follow_request_sent\":false, \"notifications\":null }, \"geo\":null, \"coordinates\":null, \"place\":null, \"contributors\":[ 7588892 ], \"retweet_count\":74, \"entities\":{ \"hashtags\":[ ], \"urls\":[ { \"url\":\"https:\\/\\/t.co\\/bWXjhurvwp\", \"expanded_url\":\"https:\\/\\/dev.twitter.com\\/blog\\/sunsetting-anywhere\", \"display_url\":\"dev.twitter.com\\/blog\\/sunsettin…\", \"indices\":[ 45, 68 ] } ], \"user_mentions\":[ { \"screen_name\":\"anywhere\", \"name\":\"Anywhere\", \"id\":9576402, \"id_str\":\"9576402\", \"indices\":[ 14, 23 ] } ] }, \"favorited\":false, \"retweeted\":false, \"possibly_sensitive\":false, \"lang\":\"en\" }"
- val tweetStatusObj = TwitterObjectFactory.createStatus(tweetJSON)
- // simple mapping from primitives to dynamic Solr fields using reflection
- val doc: SolrInputDocument = SolrSupport.autoMapToSolrInputDoc("tweet-" + tweetStatusObj.getId, tweetStatusObj, null)
- logger.info("Mapped to Document: " + doc.toString)
-
- assert(doc.containsKey("createdAt_tdt"))
- assert(doc.containsKey("lang_s"))
- assert(doc.containsKey("favoriteCount_i"))
- assert(doc.containsKey("source_s"))
- assert(doc.containsKey("retweeted_b"))
- assert(doc.containsKey("retweetCount_i"))
- assert(doc.containsKey("inReplyToStatusId_l"))
- }
-
-}