Apache Storm 和 Apache Kafka 相关知识,可以分别参考《Apache Storm 简介》和《Apache Kafka 分布式消息队列框架》
搭建 Storm 和 Kafka 的基础环境
搭建 Storm / Kafka 集群
具体安装步骤,详见我的另一篇博客《Apache Eagle》
启动 Kafka
- Start the zookeeper and kafka server
| 1 2 | $ bin/zookeeper-server-start.sh config/zookeeper.properties $ bin/kafka-server-start.sh config/server.properties |
- Create a topic
| 1 | $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-replicated-topic |
- List topics
| 1 | $ bin/kafka-topics.sh --list --zookeeper localhost:2181 |
发送 Message 往 Kafka
编写 SendMessageToKafka
- 根据 kafka 中 cluster 的属性,定义好 Producer
- 利用 Producer.send(KeyedMessage) 方法,将 topic - message 发送给 Kafka
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public class SendMessageToKafka { private static Producer<String, String> producer; private static void init() { Properties props = new Properties(); props.put("zk.connect", "192.168.1.201:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.1.201:9092"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public static void main(String... arg) { init(); KeyedMessage<String, String> data = new KeyedMessage<String, String>("my-replicated-topic", "asdf2015"); producer.send(data); producer.close(); } } |
- Run the main method
| 1 | com.yuzhouwan.hadoop.customer_behaviour_analyse.kafka.SendMessageToKafka |
- Check out the message that tht broker catched
| 1 | $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic |
Then, u will see that the asdf2015 message was sent sucessfully.
从 Kafka 中获得 Message
编写 TestMessageScheme
- 在 deserialize(byte[]) 方法中将 message 显示
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public class TestMessageScheme implements Scheme { // ... public List<Object> deserialize(byte[] ser) { String msg; try { msg = new String(ser, "UTF-8"); System.out.println("$$$$$$$$$$$$$$" + msg); return new Values(msg); } catch (UnsupportedEncodingException e) { LOGGER.error("Can not parse the provide message from bytes."); throw new RuntimeException(e); } } // ... } |
编写 ShowKafkaMessageBolt
- 在 prepare(Map, TopologyContext, OutputCollector) 中得到 OutputCollector(emit 方法完成 message 的发射)、Context(提供 name/id 之类的属性)
- 在 execute(Tuple) 中完成 message 处理工作
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public class ShowKafkaMessageBolt implements IRichBolt { private OutputCollector collector; private String name; private int id; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); System.out.println("Bolt: " + name + " and Id: " + id + " prepared ##################"); } public void execute(Tuple input) { if (input != null) { String message = input.getString(0); collector.emit(new Values(message)); System.out.println(message); } collector.ack(input); } // ... } |
编写 BehaviourAnalyse
- 基于 ZooKeeper 属性 定义 Broker
- 整合 broker、topic、zkRoot、spoutId 和 TestMessageScheme 为 SpoutConfig,完成 KafkaSpout 的实例化
- 利用 LocalCluster 完成 topology 的提交
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | public class BehaviourAnalyse { public static void main(String[] args) { BrokerHosts brokerHosts = new ZkHosts("192.168.1.201:2181"); String topic = "my-replicated-topic"; /** * We can get the param from the 'config/zookeeper.properties' path.<BR> * # the directory where the snapshot is stored.<BR> * dataDir=/tmp/zookeeper */ String zkRoot = "/tmp/zookeeper"; String spoutId = "myKafka"; SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme()); TopologyBuilder builder = new TopologyBuilder(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); builder.setSpout("kafka-spout", kafkaSpout); builder.setBolt("show-message-bolt", new ShowKafkaMessageBolt()) .shuffleGrouping("kafka-spout"); Config conf = new Config(); conf.setDebug(true); conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Show-Message-From-Kafka", conf, builder.createTopology()); } } |
- run the main method
| 1 | com.yuzhouwan.hadoop.customer_behaviour_analyse.BehaviourAnalyse |
- start a kafka´s producer
| 1 | $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic |
- Input a sentence ending, like:
| 1 | This is a message from kafka. |
You will see the information which shows in console sucessfully :-)
小技巧
- Get the value of metadata.broker.list
| 1 | $ vim config/producer.properties |
- 查看 Storm 与其他框架的兼容版本
http://mvnrepository.com/artifact/org.apache.storm