TOP

flume读取日志数据写入kafka   然后kafka+storm整合
2019-05-16 02:12:16 】 浏览:169
Tags:flume 读取 日志 数据 写入 kafka   然后 storm 整合

一、flume配置

flume要求1.6以上版本

flume-conf.properties文件配置内容,sinks的输出作为kafka的product

  1. a1.sources=r1
  2. a1.sinks=k1
  3. a1.channels=c1
  4. #Describe/configurethesource
  5. a1.sources.r1.type=exec
  6. a1.sources.r1.command=tail-F/home/airib/work/log.log
  7. #Describethesink
  8. #a1.sinks.k1.type=logger
  9. a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
  10. a1.sinks.k1.topic=test
  11. a1.sinks.k1.brokerList=localhost:9092
  12. a1.sinks.k1.requiredAcks=1
  13. a1.sinks.k1.batchSize=20
  14. #Useachannelwhichbufferseventsinmemory
  15. a1.channels.c1.type=memory
  16. a1.channels.c1.capacity=1000
  17. a1.channels.c1.transactionCapacity=100
  18. #Bindthesourceandsinktothechannel
  19. a1.sources.r1.channels=c1
  20. a1.sinks.k1.channel=c1


flume启动

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console


二 kafka的消费者java源代码

  1. packagecom.hgp.kafka.kafka;
  2. importjava.util.HashMap;
  3. importjava.util.List;
  4. importjava.util.Map;
  5. importjava.util.Properties;
  6. importkafka.consumer.ConsumerConfig;
  7. importkafka.consumer.ConsumerIterator;
  8. importkafka.consumer.KafkaStream;
  9. importkafka.javaapi.consumer.ConsumerConnector;
  10. importkafka.serializer.StringDecoder;
  11. importkafka.utils.VerifiableProperties;
  12. publicclassKafkaConsumer{
  13. privatefinalConsumerConnectorconsumer;
  14. privateKafkaConsumer(){
  15. Propertiesprops=newProperties();
  16. //zookeeper配置
  17. props.put("zookeeper.connect","localhost:2181");
  18. //group代表一个消费组
  19. props.put("group.id","jd-group");
  20. //zk连接超时
  21. props.put("zookeeper.session.timeout.ms","4000");
  22. props.put("zookeeper.sync.time.ms","200");
  23. props.put("auto.commit.interval.ms","1000");
  24. props.put("auto.offset.reset","smallest");
  25. //序列化类
  26. props.put("serializer.class","kafka.serializer.StringEncoder");
  27. ConsumerConfigconfig=newConsumerConfig(props);
  28. consumer=kafka.consumer.Consumer.createJavaConsumerConnector(config);
  29. }
  30. voidconsume(){
  31. Map<String,Integer>topicCountMap=newHashMap<String,Integer>();
  32. topicCountMap.put("test",newInteger(1));
  33. StringDecoderkeyDecoder=newStringDecoder(newVerifiableProperties());
  34. StringDecodervalueDecoder=newStringDecoder(newVerifiableProperties());
  35. Map<String,List<KafkaStream<String,String>>>consumerMap=
  36. consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
  37. KafkaStream<String,String>stream=consumerMap.get("test").get(0);
  38. ConsumerIterator<String,String>it=stream.iterator();
  39. while(it.hasNext())
  40. System.out.println(it.next().message());
  41. }
  42. publicstaticvoidmain(String[]args){
  43. newKafkaConsumer().consume();
  44. }
  45. }

kafka启动命令

启动Zookeeper server:

bin/zookeeper-server-start.shconfig/zookeeper.properties&


启动Kafka server:

bin/kafka-server-start.shconfig/server.properties&


运行producer:

bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest


运行consumer:

bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest--from-beginning






二、示例

  1. packagecom.hgp.kafka.kafka;
  2. importjava.util.Arrays;
  3. importjava.util.HashMap;
  4. importjava.util.Iterator;
  5. importjava.util.Map;
  6. importjava.util.Map.Entry;
  7. importjava.util.concurrent.atomic.AtomicInteger;
  8. importorg.apache.commons.logging.Log;
  9. importorg.apache.commons.logging.LogFactory;
  10. importstorm.kafka.BrokerHosts;
  11. importstorm.kafka.KafkaSpout;
  12. importstorm.kafka.SpoutConfig;
  13. importstorm.kafka.StringScheme;
  14. importstorm.kafka.ZkHosts;
  15. importbacktype.storm.Config;
  16. importbacktype.storm.LocalCluster;
  17. importbacktype.storm.StormSubmitter;
  18. importbacktype.storm.generated.AlreadyAliveException;
  19. importbacktype.storm.generated.InvalidTopologyException;
  20. importbacktype.storm.spout.SchemeAsMultiScheme;
  21. importbacktype.storm.task.OutputCollector;
  22. importbacktype.storm.task.TopologyContext;
  23. importbacktype.storm.topology.OutputFieldsDeclarer;
  24. importbacktype.storm.topology.TopologyBuilder;
  25. importbacktype.storm.topology.base.BaseRichBolt;
  26. importbacktype.storm.tuple.Fields;
  27. importbacktype.storm.tuple.Tuple;
  28. importbacktype.storm.tuple.Values;
  29. publicclassMyKafkaTopology{
  30. publicstaticclassKafkaWordSplitterextendsBaseRichBolt{
  31. privatestaticfinalLogLOG=LogFactory.getLog(KafkaWordSplitter.class);
  32. privatestaticfinallongserialVersionUID=886149197481637894L;
  33. privateOutputCollectorcollector;
  34. publicvoidprepare(MapstormConf,TopologyContextcontext,
  35. OutputCollectorcollector){
  36. this.collector=collector;
  37. }
  38. publicvoidexecute(Tupleinput){
  39. Stringline=input.getString(0);
  40. LOG.info("RECV[kafka->splitter]"+line);
  41. String[]words=line.split("\\s+");
  42. for(Stringword:words){
  43. LOG.info("EMIT[splitter->counter]"+word);
  44. collector.emit(input,newValues(word,1));
  45. }
  46. collector.ack(input);
  47. }
  48. publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
  49. declarer.declare(newFields("word","count"));
  50. }
  51. }
  52. publicstaticclassWordCounterextendsBaseRichBolt{
  53. privatestaticfinalLogLOG=LogFactory.getLog(WordCounter.class);
  54. privatestaticfinallongserialVersionUID=886149197481637894L;
  55. privateOutputCollectorcollector;
  56. privateMap<String,AtomicInteger>counterMap;
  57. publicvoidprepare(MapstormConf,TopologyContextcontext,
  58. OutputCollectorcollector){
  59. this.collector=collector;
  60. this.counterMap=newHashMap<String,AtomicInteger>();
  61. }
  62. publicvoidexecute(Tupleinput){
  63. Stringword=input.getString(0);
  64. intcount=input.getInteger(1);
  65. LOG.info("RECV[splitter->counter]"+word+":"+count);
  66. AtomicIntegerai=this.counterMap.get(word);
  67. if(ai==null){
  68. ai=newAtomicInteger();
  69. this.counterMap.put(word,ai);
  70. }
  71. ai.addAndGet(count);
  72. collector.ack(input);
  73. LOG.info("CHECKstatisticsmap:"+this.counterMap);
  74. }
  75. publicvoidcleanup(){
  76. LOG.info("Thefinalresult:");
  77. Iterator<Entry<String,AtomicInteger>>iter=this.counterMap.entrySet().iterator();
  78. while(iter.hasNext()){
  79. Entry<String,AtomicInteger>entry=iter.next();
  80. LOG.info(entry.getKey()+"\t:\t"+entry.getValue().get());
  81. }
  82. }
  83. publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
  84. declarer.declare(newFields("word","count"));
  85. }
  86. }
  87. publicstaticvoidmain(String[]args)throwsAlreadyAliveException,InvalidTopologyException,InterruptedException{
  88. Stringzks="localhost:2181";
  89. Stringtopic="test";
  90. StringzkRoot="/storm";//defaultzookeeperrootconfigurationforstorm
  91. Stringid="word";
  92. BrokerHostsbrokerHosts=newZkHosts(zks);
  93. SpoutConfigspoutConf=newSpoutConfig(brokerHosts,topic,zkRoot,id);
  94. spoutConf.scheme=newSchemeAsMultiScheme(newStringScheme());
  95. spoutConf.forceFromStart=true;
  96. spoutConf.zkServers=Arrays.asList(newString[]{"localhost"});
  97. spoutConf.zkPort=2181;
  98. TopologyBuilderbuilder=newTopologyBuilder();
  99. builder.setSpout("kafka-reader",newKafkaSpout(spoutConf),5);//Kafka我们创建了一个5分区的Topic,这里并行度设置为5
  100. builder.setBolt("word-splitter",newKafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
  101. builder.setBolt("word-counter",newWordCounter()).fieldsGrouping("word-splitter",newFields("word"));
  102. Configconf=newConfig();
  103. Stringname=MyKafkaTopology.class.getSimpleName();
  104. if(args!=null&&args.length>0){
  105. //Nimbushostnamepassedfromcommandline
  106. conf.put(Config.NIMBUS_HOST,args[0]);
  107. conf.setNumWorkers(3);
  108. StormSubmitter.submitTopologyWithProgressBar(name,conf,builder.createTopology());
  109. }else{
  110. conf.setMaxTaskParallelism(3);
  111. LocalClustercluster=newLocalCluster();
  112. cluster.submitTopology(name,conf,builder.createTopology());
  113. Thread.sleep(60000);
  114. cluster.shutdown();
  115. }
  116. }
  117. }


pom.xml代码

  1. <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0modelVersion>
  4. <groupId>com.ymmgroupId>
  5. <artifactId>TestStormartifactId>
  6. <version>0.0.1-SNAPSHOTversion>
  7. <packaging>jarpackaging>
  8. <name>TestStormname>
  9. <url>http://maven.apache.orgurl>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
  12. properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>junitgroupId>
  16. <artifactId>junitartifactId>
  17. <version>3.8.1version>
  18. <scope>testscope>
  19. dependency>
  20. <dependency>
  21. <groupId>org.apache.stormgroupId>
  22. <artifactId>storm-coreartifactId>
  23. <version>0.10.0version>
  24. <scope>providedscope>
  25. dependency>
  26. <dependency>
  27. <groupId>org.apache.stormgroupId>
  28. <artifactId>storm-kafkaartifactId>
  29. <version>0.10.0version>
  30. dependency>
  31. <dependency>
  32. <groupId>org.apache.kafkagroupId>
  33. <artifactId>kafka_2.9.2artifactId>
  34. <version>0.8.1.1version>
  35. <exclusions>
  36. <exclusion>
  37. <groupId>org.apache.zookeepergroupId>
  38. <artifactId>zookeeperartifactId>
  39. exclusion>
  40. <exclusion>
  41. <groupId>log4jgroupId>
  42. <artifactId>log4jartifactId>
  43. exclusion>
  44. exclusions>
  45. dependency>
  46. <dependency>
  47. <groupId>commons-logginggroupId>
  48. <artifactId>commons-loggingartifactId>
  49. <version>1.1.1version>
  50. dependency>
  51. dependencies>
  52. project>
  1. 三、storm部署
  1. <p><spanstyle="color:rgb(85,85,85);font-family:Consolas,'BitstreamVerasansMono','Couriernew',Courier,monospace;font-size:14px;line-height:15.3906px;white-space:pre;">1)打jar包mvncleanpackagespan>
  2. p><p><spanstyle="color:rgb(85,85,85);font-family:Consolas,'BitstreamVerasansMono','Couriernew',Courier,monospace;font-size:14px;line-height:15.3906px;text-indent:28px;white-space:pre;">2)上传storm集群stormjarxxx.jarcom.sss.classspan>p>






1. ZooKeeper

安装参考

2. Kafka

2.1 解压安装

# 确保scala已经安装好,本文安装的是2.11.7
tar -xf kafka_2.11-0.9.0.1.tgz
cd kafka_2.11-0.9.0.1
mkdir logs

vim ~/.bash_profile

export KAFKA_HOME=/home/zkpk/kafka_2.11-0.9.0.1
export PATH=$PATH:$KAFKA_HOME/bin

source ~/.bash_profile
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.2 配置

2.2.1 server.properties

只设置了以下4项,其他使用默认值

# 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0

host.name=hsm01

# 消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,
# 上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,
# 新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,
# 那个分区数最少就放那一个
log.dirs=/home/zkpk/kafka_2.11-0.9.0.1/logs

# 配置自定义的ZooKeeper
zookeeper.connect=hsm01:2181,hss01:2181,hss02:2181/kafka
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.2.2 复制到其他节点

scp -r ~/kafka_2.11-0.9.0.1/ hss01:~/
scp -r ~/kafka_2.11-0.9.0.1/ hss02:~/

# 修改broker.id与host.name
# 配置环境变量
  • 1
  • 2
  • 3
  • 4
  • 5

2.3 启动

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  • 1

2.4 测试

# 创建Topic
kafka-topics.sh --create --zookeeper hsm01:2181/kafka --replication-factor 1 --partitions 1 --topic shuaige

# 创建一个broker,发布者
kafka-console-producer.sh --broker-list hsm01:9092 --topic shuaige

# 创建一个订阅者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic shuaige --from-beginning

# 查看主题
kafka-topics.sh --zookeeper hsm01:2181/kafka --list 

# 查看主题详情
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

# 删除主题
kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.5 参考

Kafka【第一篇】Kafka集群搭建

3. Flume

3.1 解压安装

# /home/zkpk目录
tar -xf apache-flume-1.6.0-bin.tar.gz
mv apache-flume-1.6.0-bin/ flume-1.6.0

# 配置环境变量
vim .bash_profile

export FLUME_HOME=/home/zkpk/flume-1.6.0
export PATH=$PATH:$FLUME_HOME/bin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3.2 配置(与kafka整合)


kafkasink只有在1.6.0以上的flume版本才有。

3.2.1 flume-env.sh

JAVA_HOME=/opt/jdk1.8.0_45
  • 1

3.2.2 kafka-sogolog.properties

# configure agent
a1.sources = f1
a1.channels = c1
a1.sinks = k1

# configure the source
a1.sources.f1.type = netcat
a1.sources.f1.bind = localhost
a1.sources.f1.port = 3333

# configure the sink (kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = sogolog
a1.sinks.k1.brokerList = hsm01:9092,hss01:9092/kafka
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.batchSize = 20

# configure the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind the source and sink to the channel
a1.sources.f1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3.3 启动

启动ZooKeeper服务

$ZOOKEEPER_HOME/bin/zkServer.sh start
  • 1

启动kafka

# 启动服务
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 创建Topic
kafka-topics.sh --create --zookeeper hsm01:2181/kafka --replication-factor 1 --partitions 1 --topic sogolog

# 创建一个订阅者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic sogolog --from-beginning
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

启动flume

flume-ng agent -n a1 -c conf -f conf/kafka-sogolog.properties -Dflume.root.logger=DEBUG,console
  • 1


注:命令中的a1表示配置文件中的Agent的Name,如配置文件中的a1。flume-conf.properties表示配置文件所在配置,需填写准确的配置文件路径。

3.4 测试

telnet输入

telnet输入

flume采集数据

flume采集数据

kafka接收数据

kafka接收数据

3.5 参考

高可用Hadoop平台-Flume NG实战图解篇

linux安装flume及问题

Flume ng1.6 + kafka 2.11 整合

Flume自定义Hbase Sink的EventSerializer序列化类

Flume 1.6.0 User Guide

org/apache/flume/tools/GetJavaProperty

4. Storm

4.1 安装

Storm安装

4.2 简单测试

4.2.1 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>bigdata-demoartifactId>
        <groupId>com.zwgroupId>
        <version>1.0-SNAPSHOTversion>
    parent>
    <modelVersion>4.0.0modelVersion>

    <artifactId>storm-demoartifactId>
    <packaging>jarpackaging>

    <name>storm-demoname>
    <url>http://maven.apache.orgurl>

    <repositories>
        <repository>
            <id>github-releasesid>
            <url>http://oss.sonatype.org/content/repositories/github-releasesurl>
        repository>
        <repository>
            <id>clojars.orgid>
            <url>http://clojars.org/repourl>
        repository>
        <repository>
            <id>twitter4jid>
            <url>http://twitter4j.org/maven2url>
        repository>
    repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        <storm.version>0.9.7storm.version>
    properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.stormgroupId>
            <artifactId>storm-coreartifactId>
            <version>${storm.version}version>
            
            <scope>providedscope>
        dependency>
        <dependency>
            <groupId>com.googlecode.json-simplegroupId>
            <artifactId>json-simpleartifactId>
            <version>1.1.1version>
        dependency>
        <dependency>
            <groupId>junitgroupId>
            <artifactId>junitartifactId>
            <version>3.8.1version>
            <scope>testscope>
        dependency>
    dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-pluginartifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependenciesdescriptorRef>
                    descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>mainClass>
                        manifest>
                    archive>
                configuration>
                <executions>
                    <execution>
                        <id>make-assemblyid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
            <plugin>
                <groupId>com.theoryinpractisegroupId>
                <artifactId>clojure-maven-pluginartifactId>
                <version>1.3.8version>
                <extensions>trueextensions>
                <configuration>
                    <sourceDirectories>
                        <sourceDirectory>src/cljsourceDirectory>
                    sourceDirectories>
                configuration>
                <executions>
                    <execution>
                        <id>compileid>
                        <phase>compilephase>
                        <goals>
                            <goal>compilegoal>
                        goals>
                    execution>
                    <execution>
                        <id>testid>
                        <phase>testphase>
                        <goals>
                            <goal>testgoal>
                        goals>
                    execution>
                executions>
            plugin>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-compiler-pluginartifactId>
                <version>3.5.1version>
                <configuration>
                    <source>1.8source>
                    <target>1.8target>
                    <encoding>${project.build.sourceEncoding}encoding>
                configuration>
            plugin>
        plugins>
    build>
project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123


注意storm-core依赖的scope

4.2.2 HelloWorldSpout.java

package com.zw.storm.helloworld;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

/**
 * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务
 * 

* 生成一个随机数生成的Tuple *

* * Created by zhangws on 16/10/3. */
public class HelloWorldSpout extends BaseRichSpout { // 用来发射数据的工具类 private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout() { final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 *

* 该declarer变量有很大作用,我们还可以调用declarer.declareStream(); * 来定义stramId,该id可以用来定义更加复杂的流拓扑结构 *

* @param outputFieldsDeclarer */
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } /** * 初始化collector * * @param map * @param topologyContext * @param spoutOutputCollector */ @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } /** * 每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ @Override public void nextTuple() { Utils.sleep(100); final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if (instanceRandom == referenceRandom) { collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

4.2.3 HelloWorldBolt.java

package com.zw.storm.helloworld;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

/**
 * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。
 * 

* 用于读取已产生的Tuple并实现必要的统计逻辑 *

* * Created by zhangws on 16/10/4. */
public class HelloWorldBolt extends BaseBasicBolt { private int myCount; @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String test = tuple.getStringByField("sentence"); if ("Hello World".equals(test)) { myCount++; System.out.println("==========================: " + myCount); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4.2.4 HelloWorldTopology.java

package com.zw.storm.helloworld;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

/**
 * mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.zw.storm.helloworld.HelloWorldTopology
 * Created by zhangws on 16/10/4.
 */
public class HelloWorldTopology {

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
        builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 1);
        // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
        builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2)
                .shuffleGrouping("randomHelloWorld");

        Config config = new Config();
        config.setDebug(true);

        if (args != null && args.length > 0) {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            // 这里是本地模式下运行的启动代码。
            config.setMaxTaskParallelism(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

4.2.5 运行

# maven
mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.zw.storm.helloworld.HelloWorldTopology

# java直接运行
修改storm-core依赖的scope为compile
  • 1
  • 2
  • 3
  • 4
  • 5

结果

...
34568 [Thread-15-HelloWorldBolt] INFO  backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:3, stream: default, id: {}, [Other Random Word]
34671 [Thread-11-randomHelloWorld] INFO  backtype.storm.daemon.task - Emitting: randomHelloWorld default [Hello World]
34671 [Thread-15-HelloWorldBolt] INFO  backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:3, stream: default, id: {}, [Hello World]
==========================: 5
34776 [Thread-11-randomHelloWorld] INFO  backtype.storm.daemon.task - Emitting: randomHelloWorld default [Other Random Word]
34776 [Thread-15-HelloWorldBolt] INFO  backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:3, stream: default, id: {}, [Other Random Word]
34880 [Thread-11-randomHelloWorld] INFO  backtype.storm.daemon.task - Emitting: randomHelloWorld default [Other Random Word]
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4.3 与Kafka集成

4.3.1 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>bigdata-demoartifactId>
        <groupId>com.zwgroupId>
        <version>1.0-SNAPSHOTversion>
    parent>
    <modelVersion>4.0.0modelVersion>

    <artifactId>kafka2stormartifactId>
    <packaging>jarpackaging>

    <name>kafka2stormname>
    <url>http://maven.apache.orgurl>

    <properties>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        <storm.version>0.9.7storm.version>
        <kafka.version>0.9.0.1kafka.version>
    properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.stormgroupId>
            <artifactId>storm-coreartifactId>
            <version>${storm.version}version>
            
            <scope>providedscope>
        dependency>
        <dependency>
            <groupId>org.apache.stormgroupId>
            <artifactId>storm-kafkaartifactId>
            <version>${storm.version}version>
        dependency>
        <dependency>
            <groupId>junitgroupId>
            <artifactId>junitartifactId>
            <version>3.8.1version>
            <scope>testscope>
        dependency>
        <dependency>
            <groupId>org.apache.kafkagroupId>
            <artifactId>kafka_2.11artifactId>
            <version>${kafka.version}version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeepergroupId>
                    <artifactId>zookeeperartifactId>
                exclusion>
                <exclusion>
                    <groupId>org.slf4jgroupId>
                    <artifactId>slf4j-log4j12artifactId>
                exclusion>
                <exclusion>
                    <groupId>log4jgroupId>
                    <artifactId>log4jartifactId>
                exclusion>
            exclusions>
        dependency>
    dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-pluginartifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependenciesdescriptorRef>
                    descriptorRefs>
                configuration>
                <executions>
                    <execution>
                        <id>make-assemblyid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
        plugins>
    build>
project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86

4.3.2 MessageScheme.java

package com.zw.kafka.storm;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 对kafka出来的数据转换成字符串
 * 

* KafkaSpout是Storm中自带的Spout, * 使用KafkaSpout时需要子集实现Scheme接口,它主要负责从消息流中解析出需要的数据 *

* * Created by zhangws on 16/10/2. */
public class MessageScheme implements Scheme { public List deserialize(byte[] bytes) { try { String msg = new String(bytes, "UTF-8"); return new Values(msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } public Fields getOutputFields() { return new Fields("msg"); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

4.3.3 SequenceBolt.java

package com.zw.kafka.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * 把输出保存到一个文件中
 * 

* 把输出的消息放到文件kafkastorm.out中 *

* * Created by zhangws on 16/10/2. */
public class SequenceBolt extends BaseBasicBolt { /** * Process the input tuple and optionally emit new tuples based on the input tuple. *

* All acking is managed for you. Throw a FailedException if you want to fail the tuple. * * @param input * @param collector */

public void execute(Tuple input, BasicOutputCollector collector) { String word = (String) input.getValue(0); System.out.println("==============" + word); //写文件 try { DataOutputStream out_file = new DataOutputStream(new FileOutputStream("/home/zkpk/kafkastorm.out")); out_file.writeUTF(word); out_file.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } collector.emit(new Values(word)); } /** * Declare the output schema for all the streams of this topology. * * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

4.3.4 KafkaTopology.java

package com.zw.kafka.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置kafka提交topology到storm的代码
 * 

* topic1的含义kafka接收生产者过来的数据所需要的topic; * topic2是KafkaBolt也就是storm中的bolt生成的topic,当然这里topic2这行配置可以省略, * 是没有任何问题的,类似于一个中转的东西 *

* Created by zhangws on 16/10/2. */
public class KafkaTopology { private static final String BROKER_ZK_LIST = "hsm01:2181,hss01:2181,hss02:2181"; private static final String ZK_PATH = "/kafka/brokers"; public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts(BROKER_ZK_LIST, ZK_PATH); // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "sogolog", "/kafka", "kafka"); // 配置KafkaBolt中的kafka.broker.properties Config conf = new Config(); Map map = new HashMap(); // 配置Kafka broker地址 map.put("metadata.broker.list", "hsm01:9092"); // serializer.class为消息的序列化类 map.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", map); // 配置KafkaBolt生成的topic conf.put("topic", "topic2"); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); // spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1); builder.setBolt("kafka-bolt", new SequenceBolt()).shuffleGrouping("kafka-spout"); builder.setBolt("kafka-bolt2", new KafkaBolt()).shuffleGrouping("kafka-bolt"); String name = KafkaTopology.class.getSimpleName(); if (args != null && args.length > 0) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(2); StormSubmitter.submitTopology(name, conf, builder.createTopology()); } else { //本地模式运行 conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Utils.sleep(60000); cluster.killTopology(name); cluster.shutdown(); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

4.3.5 拷贝kafka依赖jar包到storm lib

cp ~/kafka_2.11-0.9.0.1/libs/kafka_2.11-0.9.0.1.jar ~/storm-0.9.7/lib/
cp ~/kafka_2.11-0.9.0.1/libs/scala-library-2.11.7.jar ~/storm-0.9.7/lib/
cp ~/kafka_2.11-0.9.0.1/libs/metrics-core-2.2.0.jar ~/storm-0.9.7/lib/
cp ~/kafka_2.11-0.9.0.1/libs/log4j-1.2.17.jar ~/storm-0.9.7/lib/
# cp ~/kafka_2.11-0.9.0.1/libs/slf4j-api-1.7.6.jar ~/storm-0.9.7/lib/
cp ~/kafka_2.11-0.9.0.1/libs/jopt-simple-3.2.jar ~/storm-0.9.7/lib/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4.3.2 运行

启动ZooKeeper与storm集群。

启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  • 1

运行kafkatopology

storm jar /home/zkpk/doc/kafka2storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.zw.kafka.storm.KafkaTopology hsm01
  • 1

创建一个订阅者

kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic topic2 --from-beginning
  • 1

启动kafka生产者

kafka-console-producer.sh --broker-list hsm01:9092 --topic sogolog
  • 1

结果

# 生产者
[zkpk@hsm01 ~]$ kafka-console-producer.sh --broker-list hsm01:9092 --topic sogolog
nihao
hello storm-kafka
你好,storm-kafka

# 经过storm处理的消费者
[zkpk@hsm01 ~]$ kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic topic2 --from-beginning
nihao
hello storm-kafka
你好,storm-kafka
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4.4 参考

storm-starter IDE 下的调试经历

kafka与storm集成测试问题小结

Storm集成Kafka应用的开发

KafkaSpout 引起的 log4j 的问题


<dependencies>

<dependency>

<groupId>org.apache.stormgroupId>

<artifactId>storm-coreartifactId>

<version>1.0.2version>

<scope>providedscope>

dependency>

<dependency>

<groupId>org.apache.stormgroupId>

<artifactId>storm-kafkaartifactId>

<version>1.0.2version>

dependency>

<dependency>

<groupId>org.apache.commonsgroupId>

<artifactId>commons-lang3artifactId>

<version>3.3.2version>

dependency>

<dependency>

<groupId>org.apache.kafkagroupId>

<artifactId>kafka_2.9.2artifactId>

<version>0.8.2.2version>

<exclusions>

<exclusion>

<groupId>org.apache.zookeepergroupId>

<artifactId>zookeeperartifactId>

exclusion>

<exclusion>

<groupId>log4jgroupId>

<artifactId>log4j<artifactId>

exclusion>

<exclusion>

<groupId>org.slf4jgroupId>

<artifactId>slf4j-log4j12artifactId>

exclusion>

exclusions>

dependency>

<dependency>

<groupId>org.elasticsearchgroupId>

<artifactId>elasticsearchartifactId>

<version>1.4.4version>

<exclusions>

<exclusion>

<groupId>log4jgroupId>

<artifactId>log4j<artifactId>

exclusion>

<exclusion>

<groupId>org.slf4jgroupId>

<artifactId>slf4j-log4j12artifactId>

exclusion>

exclusions>

dependency>

dependencies>

<build>

<plugins>

<plugin>

<artifactId>maven-compiler-pluginartifactId>

<configuration>

<encoding>utf-8encoding>

<source>1.8source>

<target>1.8target>

configuration>

plugin>

<plugin>

<artifactId>maven-assembly-pluginartifactId>

<configuration>

<archive>

<manifest><manifest>

archive>

<descriptorRefs>

<descriptorRef>jar-with-dependenciesdescriptorRef>

descriptorRefs>

<encoding>utf-8encoding>

configuration>

plugin>

plugins>

build>

5. Flume、Kafka与Storm集成测试

# 启动经过storm处理的订阅者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic topic2

# 运行kafkatopology
storm jar /home/zkpk/doc/kafka2storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.zw.kafka.storm.KafkaTopology hsm01

# 启动flume
flume-ng agent -n a1 -c conf -f /home/zkpk/flume-1.6.0/conf/kafka-sogolog.properties -Dflume.root.logger=DEBUG,console

# 复制文件到flume监视目录
cp test.log  flume/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

最终结果



flume读取日志数据写入kafka   然后kafka+storm整合 https://www.cppentry.com/bencandy.php?fid=119&id=224310

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇没有了 下一篇Authentication plugin 'cach..

kafka-
kafka   Partit
解决android studio
Kafka史上最详细原理
Error while fetchin
【Kafka】安装与快速
    &
flume读取日志数据写
Authentication plug
Flume 自定义source
flume   三大核
ICC副本>>>
愚公移山  
Hbase架构   Hb
5 hbase-shell + &
Hbase   MapRed
MetaException(messa
Exception in thread
HIVE metastore Dupl
-->