Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to set directory or filename dynamically based on the information in a tuple? #26

Open
huxihx opened this issue Mar 19, 2015 · 3 comments

Comments

@huxihx
Copy link

huxihx commented Mar 19, 2015

Hi there,
Thanks much for this contrib.
We kind of design a log aggregating system which collects logs from sources. The system puts all the log lines into a single Kafka topic. With the help of storm-kafka, we could consume each log line right now, but encounter a problem when we are going to transform each line into an HDFS file.

Sounds like storm-hdfs can only specify the directory and file name at the very first stage. We could not route different log lines from different log sources to different HDFS files.

by the way, we rewrote a whole framework like what you offered to by-pass this problem but ran into a performance issue when frequently appending and closing HDFS file, which made us give up.

Is there any plan that storm-hdfs is able to support this scenario in the future? Thanks!

@rs-01
Copy link

rs-01 commented Jun 3, 2015

It will also take care of our use cases where we want to partition the data in HDFS based on the time when event was generated. The events generation and their entry into data ingestion pipeline can get delayed by several hours and in some cases few days. Flume already has this feature. Lack of this feature is blocking us not to use storm for the time being.

@250690392
Copy link

hello,Thanks for your contiribution.
I meet a mistake when running SeuqenceFileTopology: the cosole print:
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1968) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1951) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:139) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.io.SequenceFile$Writer.hsync(SequenceFile.java:1250) ~[hadoop-common-2.6.0.jar:na]
at org.apache.storm.hdfs.bolt.SequenceFileBolt.execute(SequenceFileBolt.java:114) ~[classes/:na]
at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630) [na:0.9.1-incubating]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398) [na:0.9.1-incubating]
at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) [na:0.9.1-incubating]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) [storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) [storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) [na:0.9.1-incubating]
at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745) [na:0.9.1-incubating]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) [na:0.9.1-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Could you help me find the fault?

1 similar comment
@250690392
Copy link

hello,Thanks for your contiribution.
I meet a mistake when running SeuqenceFileTopology: the cosole print:
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1968) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1951) ~[hadoop-hdfs-2.6.0.jar:na]
at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:139) ~[hadoop-common-2.6.0.jar:na]
at org.apache.hadoop.io.SequenceFile$Writer.hsync(SequenceFile.java:1250) ~[hadoop-common-2.6.0.jar:na]
at org.apache.storm.hdfs.bolt.SequenceFileBolt.execute(SequenceFileBolt.java:114) ~[classes/:na]
at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630) [na:0.9.1-incubating]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398) [na:0.9.1-incubating]
at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) [na:0.9.1-incubating]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) [storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) [storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) [na:0.9.1-incubating]
at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745) [na:0.9.1-incubating]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) [na:0.9.1-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Could you help me find the fault?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants