利用pyspark ,如何把key value 转换成我们自定义的一个string进行输出到hdfs。有些时候是不需要输出key只需要输出value; 有的时候是把key和value合并成一个字符串中间不需要tab分割。
看官方的这个文档很多参数不知道要写什么。
这个是要写入到hdfs,配置是hadoop的jobconf配置。需要看hadoop的jobconf配置。经查验conf 输入为dict类型
mapreduce.output.textoutputformat.separator 即可配置TextOutputFormat的分隔符,这个配置是在org.apache.hadoop.mapred.TextOutputFormat中。
1
tf.map(extracmap).saveAsHadoopFile("/tmp/test",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})

完整实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/python
# -*- coding: UTF-8 -*-

'''
命令
/home/mg/spark-2.2.0/bin/spark-submit --master yarn --num-executors 100 --name pathfilter --queue default good.py 20180128
测试
/home/mg/spark-2.2.0/bin/spark2-submit --master local good.py 20181009
'''

from __future__ import  print_function
from pyspark.sql.types import *
from pyspark import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
import sys
import re
import json
import time

reload(sys)
sys.setdefaultencoding('utf-8')


#日志处理
def extracmap(line):
    try:
        line_list = line.split(' - {')
        line_json = '{'+line_list[1]
        date_time =  line_list[0].split('|')[0].split('.')
        mobj = json.loads(line_json)
        if mobj:
            tarobj = {}
            tarobj['distinct_id']=mobj.has_key('uid') and mobj['uid'] or '000001'
            tarobj['time'] = mobj.has_key('logTime') and int(time.mktime(time.strptime(mobj['logTime'],'%Y-%m-%d %H:%M:%S'))) or 0
            return (json.dumps(tarobj),"")
    except Exception, e:
        pass

def filtermap(line):
    if line :
        return True
    return False


def mymain(date):
    spark = SparkSession \
        .builder \
        .appName("goodoutput") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    sc = spark.sparkContext

    tf = sc.textFile("/tmp/mylogs/"+date+"/")
    tf.map(extracmap).filter(filtermap).saveAsHadoopFile("/tmp/logs/"+date+"/",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Usage: good.py <date>", file=sys.stderr)
        exit(-1)
    mymain(sys.argv[1])

与基于接收机(Receiver-based Approach)的方法相比,directstream方法具有以下优点。
1. 简化并行性:自动创建n个rdd(和分区数目一致)。
2. 效率:直接读取效率高。
3. 完全一次的语义:能够很好的避免多次消费。

● Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
● Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice – once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
● Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark

● Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

请注意,这种方法的一个缺点是它不会在Zookeeper中更新偏移量,需要手工自己处理。

例子中偏移量存储在mysql数据库表格中,方便查阅。

/home/work/spark-1.6.0-cdh5.8.0/bin/spark-submit
–jars /home/work/spark-1.6.0-cdh5.8.0/lib/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar,/home/work/spark-1.6.0-cdh5.8.0/lib/spark-streaming_2.10-1.6.0-cdh5.8.0.jar –conf spark.streaming.kafka.maxRatePerPartition=40
./rr.py 10.0.4.1:9092 nginx_www true
运行说明:
./rr.py brokerlist topic true/false(是否从mysql读取偏移量)
首次运行的时候mysql表中未存储偏移量所以最后一个参数用false。
杀死后再次启动用true即可从上次失败位置继续
数据库的配置在rr.py中设置。数据库表格的创建sql在源码sql中有。

rr.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# 存储偏移量到mysql
from __future__ import print_function




import sys
import json
import traceback
import logging
import MySQLdb
import decimal
import urllib2
import time



from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition



FORMAT = '%(asctime)-15s %(message)s'
logger = logging.getLogger('root')
logger.setLevel(logging.DEBUG)


class JSONObject:
    def __init__(self, d):
        self.__dict__ = d


gdbconf = {

    'ddseconds':10,
    'sparkdbconf' : {
        'host': '20.25.194.93',
        'port': 3306,
        'user': '****',
        'passwd': '*****',
        'db': 'mytest',
        'charset': 'utf8'
    }
}

import re




#处理每一行
def processrecord(line):
    import sys
    reload(sys)
    sys.setdefaultencoding("utf-8")
    line = line[1].decode('utf-8').encode('utf-8')

    try:

        theone = dict()
        fields = line.split('|')
        if len(fields)>25:
            return fields[25]

        return None



    except ValueError as e:
        #print(e)
        return None
        #return "【line json decode erro】"+line
    except :
        #print(traceback.format_exc())
        raise
    pass

def getoffset(topic):
    fromOffsets = dict()
    db = MySQLdb.connect(**gdbconf['sparkdbconf'])
    cursor=db.cursor()
    count = cursor.execute("select `partition`,`offset` from sparkstreaming where `topic`='%s' " %(topic))
    if count>=1:
        ofs = cursor.fetchall()
        for o in ofs:
            topicPartion = TopicAndPartition(topic,int(o[0]))
            fromOffsets[topicPartion] = long(o[1])
        return fromOffsets
    else:
        print("no offset found")
        exit(1)
    pass




def updateoffset(rdd):

    if rdd.isEmpty() is False:

        progress = 'logtime'

        db = MySQLdb.connect(**gdbconf['sparkdbconf'])
        db.autocommit(1)

        cursor=db.cursor()
        for o in rdd.offsetRanges():
            print(o.topic)
            print(o.partition,o.untilOffset,o.untilOffset)
            count = cursor.execute("INSERT INTO sparkstreaming (`topic`,`partition`,`offset`,`progress`) VALUES ('%s',%d,%d,'%s')  ON DUPLICATE KEY UPDATE `offset`=%d,`progress`='%s'" %(o.topic,o.partition,o.untilOffset,progress,o.untilOffset,progress))
            if count>=1:
                print("update offset success")
            else:
                print("offset update error")
        pass
        cursor.close()
        db.close()
    else:
        print("rdd is empty no need to update offset")

 #输出数据到数据库
def get_output(_, rdd):

    newrdd = rdd.map(processrecord).filter(lambda x: x is not None).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    if newrdd.isEmpty() is False:

        try:
            updateoffset(rdd)
        except :
            traceback.print_exc()
        else:
            pass

        ##遍历rdd把所有的数据都拿过来
        for jstr in newrdd.collect():
            try:
                print(jstr)
            except:
                print(traceback.format_exc())
                #raise
        pass


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: xxx.py <broker_list> <topic> <fromlast>", file=sys.stderr)
        exit(-1)
    brokers, topic, fromlast  = sys.argv[1:]
    print("Creating new context")
    #create 2 local ddr
    sc = SparkContext("local[2]", "logsdk2")
    ssc = StreamingContext(sc, gdbconf['ddseconds'])

    fromOffsets = None
    if fromlast == "true":
        fromOffsets = getoffset(topic)
        pass

    orderkafkaDstream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers},fromOffsets)
    orderkafkaDstream.foreachRDD(get_output)

    ssc.start()
    ssc.awaitTermination()

创建mysql相应的表格

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `sparkstreaming` (
  `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
  `topic` VARCHAR(80) NOT NULL DEFAULT '' COMMENT 'kafka topic',
  `partition` INT(11) NOT NULL DEFAULT '0' COMMENT 'kafka partition',
  `offset` BIGINT(20) NOT NULL COMMENT '偏移量',
  `updatetime` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `progress` VARCHAR(50) DEFAULT NULL COMMENT '日志的时间进度(方便查看)',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_topic_partition` (`topic`,`partition`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

参考文档:
https://spark.apache.org/docs/1.6.1/streaming-kafka-integration.html