设为首页 加入收藏

TOP

Flink学习之旅----Flink整合Kafka实现单词的统计
2019-05-15 02:16:26 】 浏览:77
Tags:Flink 学习 之旅 ----Flink 整合 Kafka 实现 单词 统计

前言

因为工作的需求,我在几天里面学习了Flink和Kafka,今天写出来Flink整合Kafka实现单词的统计

Kafka简单命令

对于kafka的简介,我这里就不多说了,大家自己百度一下,应该都清楚的,我这里就介绍一些Kafka的简单命令,因为项目会用到

安装kafka

因为我是mac电脑,在安装Kafka的时候,比较方便 brew install Kafka 一键安装 Kafka,zookeeper,解压和环境配置

环境启动

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

生产者

1.进入kafka的相关目录下面
cd /usr/local/Cellar/kafka/2.2.0/
2.创建一个topic
 ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka-first-topic
3.查看topic
./bin/kafka-topics --list --zookeeper localhost:2181
4.启动生产者
./bin/kafka-console-producer --broker-list localhost:9092 --topic kafka-first-topic

生产者

1.进入kafka的相关目录下面
cd /usr/local/Cellar/kafka/2.2.0/
2.启动消费者
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-first-topic --from-beginning

开始实战

前期准备

首先创建一个Kafka的topic flink-windows

Flink链接Kafka

导入相关依赖

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>

Flink具体代码

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class FlinkCostKafka {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);


        DataStream<Tuple2<String, Integer>> counts = null;
		
		//创建链接
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("zookeeper.connect", "127.0.0.1:2181");
        properties.setProperty("group.id", "flink-windows");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("flink-windows", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        counts= stream.flatMap(new LineSplitter()).keyBy(0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(2))).sum(1);

        counts.print().setParallelism(1);

        env.execute("FlinkCostKafka");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

}

数据写入Kafka

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest implements Runnable {

    private final KafkaProducer<String, String> producer;
    private final String topic;
    public KafkaProducerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer<String, String>(props);
        this.topic = topicName;
    }

    @Override
    public void run() {
        int messageNo = 1;
        try {
            for(;;) {
                String messageStr="hello,this is "+messageNo+" messages";
                producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
                //生产了100条就打印
                if(messageNo%100==0){
                    System.out.println("发送的信息:" + messageStr);
                }
                //生产1000条就退出
                if(messageNo%1000==0){
                    System.out.println("成功发送了"+messageNo+"条");
                    break;
                }
                messageNo++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

    public static void main(String args[]) {
        KafkaProducerTest test = new KafkaProducerTest("flink-windows");
        Thread thread = new Thread(test);
        thread.start();
    }
}

完整的项目代码已经传到我的github,供大家学习,编写不易,希望大家给一个star


yabo体育app下载开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Error while fetching metadata w.. 下一篇 ? ? ?  ..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }