Skip to content

Latest commit

 

History

History
96 lines (82 loc) · 3.8 KB

samza.md

File metadata and controls

96 lines (82 loc) · 3.8 KB

Samza

If you're using Samza to process your event stream, you have two options for sending a stream to Druid:

  • Use Samza's builtin Kafka support to write messages to a Kafka topic, then use Druid's Kafka 0.8 support to read from the same Kafka topic. This method does not involve Tranquility at all.
  • Use Tranquility's builtin Samza SystemFactory to send data to Druid over HTTP.

Since this is the Tranquility documentation, we'll talk about the second method. You can set up a Samza job with the following properties:

systems.druid.samza.factory: com.metamx.tranquility.samza.BeamSystemFactory
systems.druid.beam.factory: com.example.MyBeamFactory
systems.druid.beam.batchSize: 2000 # Optional; we'll send batches of this size to Druid
systems.druid.beam.maxPendingBatches: 5 # Optional; maximum number of in-flight batches before the producer blocks

To make this work, you need to implement com.example.MyBeamFactory. It should return a Beam<Object>. See the Configuration documentation for details.

For example:

import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.samza.BeamFactory;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.samza.config.Config;
import org.apache.samza.system.SystemStream;
import org.joda.time.DateTime;
import org.joda.time.Period;

import java.util.List;
import java.util.Map;

public class MyBeamFactory implements BeamFactory
{

  @Override
  public Beam<Object> makeBeam(SystemStream stream, Config config)
  {
    final String zkConnect = "localhost";
    final String dataSource = stream.getStream();

    final List<String> dimensions = ImmutableList.of("column");
    final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
        new CountAggregatorFactory("cnt")
    );

    // The Timestamper should return the timestamp of the class your Samza task produces. Samza envelopes contain
    // Objects, so you'll generally have to cast them here.
    final Timestamper<Object> timestamper = new Timestamper<Object>()
    {
      @Override
      public DateTime timestamp(Object obj)
      {
        final Map<String, Object> theMap = (Map<String, Object>) obj;
        return new DateTime(theMap.get("timestamp"));
      }
    };

    final CuratorFramework curator = CuratorFrameworkFactory.builder()
                                                            .connectString(zkConnect)
                                                            .retryPolicy(new ExponentialBackoffRetry(500, 15, 10000))
                                                            .build();
    curator.start();

    return DruidBeams
        .builder(timestamper)
        .curator(curator)
        .discoveryPath("/druid/discovery")
        .location(DruidLocation.create("druid/overlord", "druid:firehose:%s", dataSource))
        .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularities.MINUTE))
        .tuning(
            ClusteredBeamTuning.builder()
                               .segmentGranularity(Granularity.HOUR)
                               .windowPeriod(new Period("PT10M"))
                               .build()
        )
        .buildBeam();
  }
}