利用pyspark ,如何把key value 转换成我们自定义的一个string进行输出到hdfs。有些时候是不需要输出key只需要输出value; 有的时候是把key和value合并成一个字符串中间不需要tab分割。
看官方的这个文档很多参数不知道要写什么。
这个是要写入到hdfs,配置是hadoop的jobconf配置。需要看hadoop的jobconf配置。经查验conf 输入为dict类型
mapreduce.output.textoutputformat.separator 即可配置TextOutputFormat的分隔符,这个配置是在org.apache.hadoop.mapred.TextOutputFormat中。
1 | tf.map(extracmap).saveAsHadoopFile("/tmp/test",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''}) |
完整实例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | #!/usr/bin/python # -*- coding: UTF-8 -*- ''' 命令 /home/mg/spark-2.2.0/bin/spark-submit --master yarn --num-executors 100 --name pathfilter --queue default good.py 20180128 测试 /home/mg/spark-2.2.0/bin/spark2-submit --master local good.py 20181009 ''' from __future__ import print_function from pyspark.sql.types import * from pyspark import * from pyspark.sql import Row from pyspark.sql import SparkSession import sys import re import json import time reload(sys) sys.setdefaultencoding('utf-8') #日志处理 def extracmap(line): try: line_list = line.split(' - {') line_json = '{'+line_list[1] date_time = line_list[0].split('|')[0].split('.') mobj = json.loads(line_json) if mobj: tarobj = {} tarobj['distinct_id']=mobj.has_key('uid') and mobj['uid'] or '000001' tarobj['time'] = mobj.has_key('logTime') and int(time.mktime(time.strptime(mobj['logTime'],'%Y-%m-%d %H:%M:%S'))) or 0 return (json.dumps(tarobj),"") except Exception, e: pass def filtermap(line): if line : return True return False def mymain(date): spark = SparkSession \ .builder \ .appName("goodoutput") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() sc = spark.sparkContext tf = sc.textFile("/tmp/mylogs/"+date+"/") tf.map(extracmap).filter(filtermap).saveAsHadoopFile("/tmp/logs/"+date+"/",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''}) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: good.py <date>", file=sys.stderr) exit(-1) mymain(sys.argv[1]) |