-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathHybridTopology_csharpSpout_javaCsharpBolt.cs
76 lines (67 loc) · 3.37 KB
/
HybridTopology_csharpSpout_javaCsharpBolt.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using Microsoft.SCP;
using Microsoft.SCP.Topology;
namespace Scp.App.HybridTopologyHostMode
{
/// <summary>
/// TopologyBuilder hybrid topology example with a CSharp Spout, sending to both a C# bolt and a Java Bolt
/// </summary>
class HybridTopology_csharpSpout_javaCsharpBolt : TopologyDescriptor
{
public ITopologyBuilder GetTopologyBuilder()
{
TopologyBuilder topologyBuilder = new TopologyBuilder("HybridTopology_csharpSpout_javaCsharpBolt");
// Demo how to set a customized JSON Deserializer to deserialize a JSON string into Java object (to send to a Java Bolt)
// Here, fullname of the Java JSON Deserializer class and target deserialized class are required
List<string> javaDeserializerInfo = new List<string>() { "microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer", "microsoft.scp.example.HybridTopology.Person" };
topologyBuilder.SetSpout(
"generator",
Generator.Get,
new Dictionary<string, List<string>>()
{
{Constants.DEFAULT_STREAM_ID, new List<string>(){"person"}}
},
1,
null).DeclareCustomizedJavaDeserializer(javaDeserializerInfo);
// Demo how to set parameters to initialize the constructor of Java Spout/Bolt
JavaComponentConstructor constructor = new JavaComponentConstructor(
"microsoft.scp.example.HybridTopology.Displayer",
new List<Tuple<string, object>>()
{
Tuple.Create<string, object>(JavaComponentConstructor.JAVA_PRIMITIVE_TYPE_INT, 100),
Tuple.Create<string, object>(JavaComponentConstructor.JAVA_LANG_STRING, "test"),
Tuple.Create<string, object>(JavaComponentConstructor.JAVA_LANG_STRING, string.Empty)
});
// The java bolt "java_displayer" receives from the C# spout "generator"
topologyBuilder.SetJavaBolt(
"java_displayer",
constructor,
1).shuffleGrouping("generator");
// Demo how to set a customized JSON Serializer to serialize a Java object (emitted by Java Spout) into JSON string
// Here, fullname of the Java JSON Serializer class is required
List<string> javaSerializerInfo = new List<string>() { "microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer" };
// The C# bolt "csharp-displayer" receive from the C# spout "generator"
topologyBuilder.SetBolt(
"csharp-displayer",
Displayer.Get,
new Dictionary<string, List<string>>(),
1).
DeclareCustomizedJavaSerializer(javaSerializerInfo).
shuffleGrouping("generator");
// Demo how to set topology config
StormConfig conf = new StormConfig();
conf.setDebug(false);
conf.setNumWorkers(1);
conf.setStatsSampleRate(0.05);
conf.setWorkerChildOps("-Xmx1024m");
conf.Set("topology.kryo.register", "[\"[B\"]");
topologyBuilder.SetTopologyConfig(conf);
return topologyBuilder;
}
}
}