-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Marekhorst 1434 reimplement context importer module by replacing is lookup with RESTful endpoint #1437
Marekhorst 1434 reimplement context importer module by replacing is lookup with RESTful endpoint #1437
Changes from 2 commits
0590742
bad9312
91874ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package eu.dnetlib.iis.wf.importer.concept; | ||
|
||
import static eu.dnetlib.iis.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.avro.file.DataFileWriter; | ||
import org.apache.commons.io.IOUtils; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.log4j.Logger; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.gson.Gson; | ||
|
||
import eu.dnetlib.iis.common.counter.NamedCounters; | ||
import eu.dnetlib.iis.common.counter.NamedCountersFileWriter; | ||
import eu.dnetlib.iis.common.java.PortBindings; | ||
import eu.dnetlib.iis.common.java.Process; | ||
import eu.dnetlib.iis.common.java.io.DataStore; | ||
import eu.dnetlib.iis.common.java.io.FileSystemPath; | ||
import eu.dnetlib.iis.common.java.porttype.AvroPortType; | ||
import eu.dnetlib.iis.common.java.porttype.PortType; | ||
import eu.dnetlib.iis.importer.schemas.Concept; | ||
import eu.dnetlib.iis.wf.importer.concept.model.Context; | ||
import eu.dnetlib.iis.wf.importer.concept.model.Param; | ||
import eu.dnetlib.iis.wf.importer.facade.ContextNotFoundException; | ||
import eu.dnetlib.iis.wf.importer.facade.ContextStreamingFacade; | ||
import eu.dnetlib.iis.wf.importer.facade.ServiceFacadeUtils; | ||
|
||
/** | ||
* RESTful endpoint based importer reading contexts provided as JSON records. | ||
* @author mhorst | ||
* | ||
*/ | ||
public class RestfulEndpointBasedConceptImporter implements Process { | ||
|
||
public static final String PARAM_IMPORT_CONTEXT_IDS_CSV = "import.context.ids.csv"; | ||
|
||
protected static final String CONCEPT_COUNTER_NAME = "CONCEPT_COUNTER"; | ||
|
||
private static final Logger log = Logger.getLogger(RestfulEndpointBasedConceptImporter.class); | ||
|
||
private final NamedCountersFileWriter countersWriter = new NamedCountersFileWriter(); | ||
|
||
|
||
protected static final String PORT_OUT_CONCEPTS = "concepts"; | ||
|
||
private final Map<String, PortType> outputPorts = new HashMap<String, PortType>(); | ||
|
||
|
||
//------------------------ CONSTRUCTORS ------------------- | ||
|
||
public RestfulEndpointBasedConceptImporter() { | ||
outputPorts.put(PORT_OUT_CONCEPTS, new AvroPortType(Concept.SCHEMA$)); | ||
} | ||
|
||
//------------------------ LOGIC -------------------------- | ||
|
||
@Override | ||
public Map<String, PortType> getInputPorts() { | ||
return Collections.emptyMap(); | ||
} | ||
|
||
@Override | ||
public Map<String, PortType> getOutputPorts() { | ||
return outputPorts; | ||
} | ||
|
||
@Override | ||
public void run(PortBindings portBindings, Configuration conf, | ||
Map<String, String> parameters) throws Exception { | ||
|
||
Preconditions.checkArgument(parameters.containsKey(PARAM_IMPORT_CONTEXT_IDS_CSV), | ||
"unknown context identifier, required parameter '%s' is missing!", PARAM_IMPORT_CONTEXT_IDS_CSV); | ||
String contextIdsCSV = parameters.get(PARAM_IMPORT_CONTEXT_IDS_CSV); | ||
|
||
try (DataFileWriter<Concept> conceptWriter = getWriter(FileSystem.get(conf), portBindings)) { | ||
|
||
NamedCounters counters = new NamedCounters(new String[] { CONCEPT_COUNTER_NAME }); | ||
|
||
ContextStreamingFacade streamingFacade = ServiceFacadeUtils.instantiate(parameters); | ||
|
||
String[] contextIds = StringUtils.split(contextIdsCSV, ','); | ||
|
||
for (String contextId : contextIds) { | ||
|
||
if (StringUtils.isNotEmpty(contextId)) { | ||
|
||
try { | ||
try (InputStream is = streamingFacade.getStream(contextId)) { | ||
|
||
Concept[] concepts = buildConcepts(IOUtils.toString(is, StandardCharsets.UTF_8.name())); | ||
|
||
for (Concept concept : concepts) { | ||
conceptWriter.append(concept); | ||
} | ||
|
||
counters.increment(CONCEPT_COUNTER_NAME, (long) concepts.length); | ||
} | ||
} catch (ContextNotFoundException e) { | ||
log.warn("context not found: " + contextId, e); | ||
} | ||
} | ||
} | ||
|
||
countersWriter.writeCounters(counters, System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME)); | ||
} | ||
} | ||
|
||
/** | ||
* Provides {@link Concept} writer consuming records. | ||
*/ | ||
protected DataFileWriter<Concept> getWriter(FileSystem fs, PortBindings portBindings) throws IOException { | ||
return DataStore.create( | ||
new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_CONCEPTS)), Concept.SCHEMA$); | ||
} | ||
|
||
//------------------------ PRIVATE -------------------------- | ||
|
||
/** | ||
* Builds an array of concepts based on the JSON representation returned by the context endpoint. | ||
* @param contextsJson contexts encoded in JSON format | ||
* @return array of {@link Concept} avro records. | ||
*/ | ||
private static Concept[] buildConcepts(String contextsJson) { | ||
|
||
return translate(new Gson().fromJson(contextsJson, Context[].class)); | ||
} | ||
|
||
/** | ||
* Translates an array of {@link IISConfigurationEntry} from the JSON model into the array of {@link Concept} objects from avro model. | ||
*/ | ||
private static Concept[] translate(Context[] source) { | ||
Concept[] results = new Concept[source.length]; | ||
for (int i=0; i < source.length; i++) { | ||
results[i] = translate(source[i]); | ||
} | ||
return results; | ||
} | ||
|
||
/** | ||
* Translates {@link IISConfigurationEntry} from the JSON model into the {@link Concept} object from avro model. | ||
*/ | ||
private static Concept translate(Context source) { | ||
Concept.Builder conceptBuilder = Concept.newBuilder(); | ||
conceptBuilder.setId(source.getId()); | ||
conceptBuilder.setLabel(source.getLabel()); | ||
conceptBuilder.setParams(translate(source.getParams())); | ||
return conceptBuilder.build(); | ||
} | ||
|
||
/** | ||
* Translates the list of {@link Param} from the JSON model into the list of {@link eu.dnetlib.iis.importer.schemas.Param} objects from avro model. | ||
*/ | ||
private static List<eu.dnetlib.iis.importer.schemas.Param> translate(List<Param> source) { | ||
return source.stream().map(x -> translate(x)).collect(Collectors.toList()); | ||
} | ||
|
||
/** | ||
* Translates {@link Param} from the JSON model into the {@link eu.dnetlib.iis.importer.schemas.Param} object from avro model. | ||
*/ | ||
private static eu.dnetlib.iis.importer.schemas.Param translate(Param source) { | ||
eu.dnetlib.iis.importer.schemas.Param.Builder paramBuilder = eu.dnetlib.iis.importer.schemas.Param.newBuilder(); | ||
paramBuilder.setName(source.getName()); | ||
paramBuilder.setValue(source.getValue()); | ||
return paramBuilder.build(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package eu.dnetlib.iis.wf.importer.concept.model; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* Class representing context record. | ||
* | ||
* @author mhorst | ||
* | ||
*/ | ||
public class Context { | ||
|
||
private String id; | ||
private String label; | ||
private List<Param> params = new ArrayList<>(); | ||
|
||
public String getId() { | ||
return id; | ||
} | ||
|
||
public void setId(final String id) { | ||
this.id = id; | ||
} | ||
|
||
public String getLabel() { | ||
return label; | ||
} | ||
|
||
public void setLabel(final String label) { | ||
this.label = label; | ||
} | ||
|
||
public List<Param> getParams() { | ||
return params; | ||
} | ||
|
||
public void setParams(final List<Param> params) { | ||
this.params = params; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package eu.dnetlib.iis.wf.importer.concept.model; | ||
|
||
/** | ||
* Class representing context parameter. | ||
* | ||
* @author mhorst | ||
*/ | ||
public class Param { | ||
|
||
private String name; | ||
|
||
private String value; | ||
|
||
public String getName() { | ||
return name; | ||
} | ||
|
||
public String getValue() { | ||
return value; | ||
} | ||
|
||
public Param setName(final String name) { | ||
this.name = name; | ||
return this; | ||
} | ||
|
||
public Param setValue(final String value) { | ||
this.value = value; | ||
return this; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package eu.dnetlib.iis.wf.importer.facade; | ||
|
||
/** | ||
* An exception indicating given context was not found. | ||
* @author mhorst | ||
*/ | ||
public class ContextNotFoundException extends ContextStreamingException { | ||
|
||
|
||
private static final long serialVersionUID = -1546075729881700992L; | ||
|
||
public ContextNotFoundException(String contextId) { | ||
super(contextId); | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package eu.dnetlib.iis.wf.importer.facade; | ||
|
||
/** | ||
* An exception indicating fatal error when streaming context content. | ||
* @author mhorst | ||
*/ | ||
public class ContextStreamingException extends Exception { | ||
|
||
private static final long serialVersionUID = -711409479484772617L; | ||
|
||
/** | ||
* Context identifier. | ||
*/ | ||
private final String contextId; | ||
|
||
|
||
public ContextStreamingException(String contextId) { | ||
super("Problem occured while streaming context: " + contextId); | ||
this.contextId = contextId; | ||
} | ||
|
||
public ContextStreamingException(String contextId, Exception e) { | ||
super("Problem occured while streaming context: " + contextId, e); | ||
this.contextId = contextId; | ||
} | ||
|
||
public String getContextId() { | ||
return contextId; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package eu.dnetlib.iis.wf.importer.facade; | ||
|
||
import java.io.InputStream; | ||
|
||
/** | ||
* Context API streaming facade. | ||
* @author mhorst | ||
* | ||
*/ | ||
public interface ContextStreamingFacade { | ||
|
||
/** | ||
* Returns stream for a given context identifier. | ||
* @return underlying stream | ||
*/ | ||
InputStream getStream(String contextId) throws ContextNotFoundException, ContextStreamingException; | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package eu.dnetlib.iis.wf.importer.facade; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.HttpURLConnection; | ||
import java.net.MalformedURLException; | ||
import java.net.URL; | ||
|
||
import org.apache.log4j.Logger; | ||
|
||
/** | ||
* Context straming facade reading data from URL. | ||
* @author mhorst | ||
* | ||
*/ | ||
public class ContextUrlStreamingFacade implements ContextStreamingFacade { | ||
|
||
private static final Logger log = Logger.getLogger(ContextUrlStreamingFacade.class); | ||
|
||
private final String endpointLocation; | ||
|
||
private final int readTimeout; | ||
|
||
private final int connectionTimeout; | ||
|
||
|
||
//------------------------ CONSTRUCTOR -------------------------- | ||
|
||
/** | ||
* @param endpointLocation stream endpoint URL location | ||
* @param readTimeout url read timeout | ||
* @param connectionTimeout url connection timeout | ||
* @throws MalformedURLException | ||
*/ | ||
public ContextUrlStreamingFacade(String endpointLocation, | ||
int readTimeout, int connectionTimeout) throws MalformedURLException { | ||
this.endpointLocation = endpointLocation; | ||
this.readTimeout = readTimeout; | ||
this.connectionTimeout = connectionTimeout; | ||
} | ||
|
||
//------------------------ LOGIC -------------------------- | ||
|
||
@Override | ||
public InputStream getStream(String contextId) throws ContextNotFoundException, ContextStreamingException { | ||
try { | ||
log.info(String.format("setting timeouts for streaming service: read timeout (%s) and connect timeout (%s)", | ||
this.readTimeout, this.connectionTimeout)); | ||
URL url = new URL(buildUrl(endpointLocation, contextId)); | ||
HttpURLConnection con = (HttpURLConnection) url.openConnection(); | ||
con.setReadTimeout(this.readTimeout); | ||
con.setConnectTimeout(this.connectionTimeout); | ||
if (HttpURLConnection.HTTP_NOT_FOUND == con.getResponseCode()) { | ||
throw new ContextNotFoundException(contextId); | ||
} else { | ||
return con.getInputStream(); | ||
} | ||
} catch (IOException e) { | ||
throw new ContextStreamingException(contextId, e); | ||
} | ||
|
||
} | ||
|
||
//------------------------ PRIVATE -------------------------- | ||
|
||
private static String buildUrl(String endpointLocation, String contextId) { | ||
return endpointLocation + "/" + contextId; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, the best way would be to rely on a
and call it a day. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extracted this method to |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't actually throw the exception.