pyspark saveAsHadoopFile

利用pyspark ,如何把key value 转换成我们自定义的一个string进行输出到hdfs。有些时候是不需要输出key只需要输出value; 有的时候是把key和value合并成一个字符串中间不需要tab分割。
看官方的这个文档很多参数不知道要写什么。
这个是要写入到hdfs,配置是hadoop的jobconf配置。需要看hadoop的jobconf配置。经查验conf 输入为dict类型
mapreduce.output.textoutputformat.separator 即可配置TextOutputFormat的分隔符,这个配置是在org.apache.hadoop.mapred.TextOutputFormat中。
1
tf.map(extracmap).saveAsHadoopFile("/tmp/test",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})

完整实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/python
# -*- coding: UTF-8 -*-

'''
命令
/home/mg/spark-2.2.0/bin/spark-submit --master yarn --num-executors 100 --name pathfilter --queue default good.py 20180128
测试
/home/mg/spark-2.2.0/bin/spark2-submit --master local good.py 20181009
'''

from __future__ import  print_function
from pyspark.sql.types import *
from pyspark import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
import sys
import re
import json
import time

reload(sys)
sys.setdefaultencoding('utf-8')


#日志处理
def extracmap(line):
    try:
        line_list = line.split(' - {')
        line_json = '{'+line_list[1]
        date_time =  line_list[0].split('|')[0].split('.')
        mobj = json.loads(line_json)
        if mobj:
            tarobj = {}
            tarobj['distinct_id']=mobj.has_key('uid') and mobj['uid'] or '000001'
            tarobj['time'] = mobj.has_key('logTime') and int(time.mktime(time.strptime(mobj['logTime'],'%Y-%m-%d %H:%M:%S'))) or 0
            return (json.dumps(tarobj),"")
    except Exception, e:
        pass

def filtermap(line):
    if line :
        return True
    return False


def mymain(date):
    spark = SparkSession \
        .builder \
        .appName("goodoutput") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    sc = spark.sparkContext

    tf = sc.textFile("/tmp/mylogs/"+date+"/")
    tf.map(extracmap).filter(filtermap).saveAsHadoopFile("/tmp/logs/"+date+"/",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Usage: good.py <date>", file=sys.stderr)
        exit(-1)
    mymain(sys.argv[1])

发表评论

电子邮件地址不会被公开。 必填项已用*标注