博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python3+spark2.1+kafka0.8+sparkStreaming
阅读量:7237 次
发布时间:2019-06-29

本文共 1133 字,大约阅读时间需要 3 分钟。

python代码:

import timefrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtilsfrom operator import addsc = SparkContext(master="local[1]",appName="PythonSparkStreamingRokidDtSnCount")ssc = StreamingContext(sc, 2)zkQuorum = 'localhost:2181'topic = {
'rokid':1}groupid = "test-consumer-group"lines = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)lines1 = lines.flatMap(lambda x: x.split("\n"))valuestr = lines1.map(lambda x: x.value.decode())valuedict = valuestr.map(lambda x:eval(x))message = valuedict.map(lambda x: x["message"])rdd2 = message.map(lambda x: (time.strftime("%Y-%m-%d",time.localtime(float(x.split("\u0001")[0].split("\u0002")[1])/1000))+"|"+x.split("\u0001")[1].split("\u0002")[1],1)).map(lambda x: (x[0],x[1]))rdd3 = rdd2.reduceByKey(add)rdd3.saveAsTextFiles("/tmp/wordcount")rdd3.pprint()ssc.start()ssc.awaitTermination()

执行SparkStreaming:

spark/bin/spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar ReadFromKafkaStreaming.py

 

其中spark-streaming-kafka-0.98-assembly_2.11-2.1.0.jar从以下网站下载

http://search.maven.org

 

作为入门参考。

转载地址:http://zngfm.baihongyu.com/

你可能感兴趣的文章
【UESTC 482】Charitable Exchange(优先队列+bfs)
查看>>
通过VS2010的内存分析工具来分析程序性能问题
查看>>
mini-cygwin
查看>>
如何能低成本地快速获取大量目标用户,而不是与竞争对手持久战?
查看>>
三分钟教你同步 Visual Studio Code 设置
查看>>
程序员,你是选择25k的996还是18k的8小时工作日?
查看>>
Socket编程入门(基于Java实现)
查看>>
RX第一章
查看>>
DOM0级和DOM2级事件
查看>>
iOS Client 与WebSocket 通信(二)(转)
查看>>
网易考拉海购Java后台开发实习-面经(已拿offer)
查看>>
React-Router看这里
查看>>
打造一个通用的 RecyclerView Adapter
查看>>
基于redis的秒杀
查看>>
js如何实现上拉加载更多...
查看>>
.Net Core Logger 实现log写入本地文件系统
查看>>
Java Servlet关键点详解
查看>>
深入分析luait反编译之luajit-decomp
查看>>
从头编写 asp.net core 2.0 web api 基础框架 (5) EF CRUD
查看>>
【我们一起写框架】MVVM的WPF框架(五)—完结篇
查看>>