利用pyspark ,如何把key value 转换成我们自定义的一个string进行输出到hdfs。有些时候是不需要输出key只需要输出value; 有的时候是把key和value合并成一个字符串中间不需要tab分割。
看官方的这个文档很多参数不知道要写什么。
这个是要写入到hdfs,配置是hadoop的jobconf配置。需要看hadoop的jobconf配置。经查验conf 输入为dict类型
mapreduce.output.textoutputformat.separator 即可配置TextOutputFormat的分隔符,这个配置是在org.apache.hadoop.mapred.TextOutputFormat中。
tf.map(extracmap).saveAsHadoopFile("/tmp/test",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})
完整实例如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
命令
/home/mg/spark-2.2.0/bin/spark-submit --master yarn --num-executors 100 --name pathfilter --queue default good.py 20180128
测试
/home/mg/spark-2.2.0/bin/spark2-submit --master local good.py 20181009
'''
from __future__ import print_function
from pyspark.sql.types import *
from pyspark import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
import sys
import re
import json
import time
reload(sys)
sys.setdefaultencoding('utf-8')
#日志处理
def extracmap(line):
try:
line_list = line.split(' - {')
line_json = '{'+line_list[1]
date_time = line_list[0].split('|')[0].split('.')
mobj = json.loads(line_json)
if mobj:
tarobj = {}
tarobj['distinct_id']=mobj.has_key('uid') and mobj['uid'] or '000001'
tarobj['time'] = mobj.has_key('logTime') and int(time.mktime(time.strptime(mobj['logTime'],'%Y-%m-%d %H:%M:%S'))) or 0
return (json.dumps(tarobj),"")
except Exception, e:
pass
def filtermap(line):
if line :
return True
return False
def mymain(date):
spark = SparkSession \
.builder \
.appName("goodoutput") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
tf = sc.textFile("/tmp/mylogs/"+date+"/")
tf.map(extracmap).filter(filtermap).saveAsHadoopFile("/tmp/logs/"+date+"/",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: good.py <date>", file=sys.stderr)
exit(-1)
mymain(sys.argv[1])
# -*- coding: UTF-8 -*-
'''
命令
/home/mg/spark-2.2.0/bin/spark-submit --master yarn --num-executors 100 --name pathfilter --queue default good.py 20180128
测试
/home/mg/spark-2.2.0/bin/spark2-submit --master local good.py 20181009
'''
from __future__ import print_function
from pyspark.sql.types import *
from pyspark import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
import sys
import re
import json
import time
reload(sys)
sys.setdefaultencoding('utf-8')
#日志处理
def extracmap(line):
try:
line_list = line.split(' - {')
line_json = '{'+line_list[1]
date_time = line_list[0].split('|')[0].split('.')
mobj = json.loads(line_json)
if mobj:
tarobj = {}
tarobj['distinct_id']=mobj.has_key('uid') and mobj['uid'] or '000001'
tarobj['time'] = mobj.has_key('logTime') and int(time.mktime(time.strptime(mobj['logTime'],'%Y-%m-%d %H:%M:%S'))) or 0
return (json.dumps(tarobj),"")
except Exception, e:
pass
def filtermap(line):
if line :
return True
return False
def mymain(date):
spark = SparkSession \
.builder \
.appName("goodoutput") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
tf = sc.textFile("/tmp/mylogs/"+date+"/")
tf.map(extracmap).filter(filtermap).saveAsHadoopFile("/tmp/logs/"+date+"/",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: good.py <date>", file=sys.stderr)
exit(-1)
mymain(sys.argv[1])