利用pyspark ,如何把key value 转换成我们自定义的一个string进行输出到hdfs。有些时候是不需要输出key只需要输出value; 有的时候是把key和value合并成一个字符串中间不需要tab分割。
看官方的这个文档很多参数不知道要写什么。
这个是要写入到hdfs,配置是hadoop的jobconf配置。需要看hadoop的jobconf配置。经查验conf 输入为dict类型
mapreduce.output.textoutputformat.separator 即可配置TextOutputFormat的分隔符,这个配置是在org.apache.hadoop.mapred.TextOutputFormat中。
1
tf.map(extracmap).saveAsHadoopFile("/tmp/test",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})

完整实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/python
# -*- coding: UTF-8 -*-

'''
命令
/home/mg/spark-2.2.0/bin/spark-submit --master yarn --num-executors 100 --name pathfilter --queue default good.py 20180128
测试
/home/mg/spark-2.2.0/bin/spark2-submit --master local good.py 20181009
'''

from __future__ import  print_function
from pyspark.sql.types import *
from pyspark import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
import sys
import re
import json
import time

reload(sys)
sys.setdefaultencoding('utf-8')


#日志处理
def extracmap(line):
    try:
        line_list = line.split(' - {')
        line_json = '{'+line_list[1]
        date_time =  line_list[0].split('|')[0].split('.')
        mobj = json.loads(line_json)
        if mobj:
            tarobj = {}
            tarobj['distinct_id']=mobj.has_key('uid') and mobj['uid'] or '000001'
            tarobj['time'] = mobj.has_key('logTime') and int(time.mktime(time.strptime(mobj['logTime'],'%Y-%m-%d %H:%M:%S'))) or 0
            return (json.dumps(tarobj),"")
    except Exception, e:
        pass

def filtermap(line):
    if line :
        return True
    return False


def mymain(date):
    spark = SparkSession \
        .builder \
        .appName("goodoutput") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    sc = spark.sparkContext

    tf = sc.textFile("/tmp/mylogs/"+date+"/")
    tf.map(extracmap).filter(filtermap).saveAsHadoopFile("/tmp/logs/"+date+"/",outputFormatClass="org.apache.hadoop.mapred.TextOutputFormat",conf={'mapreduce.output.textoutputformat.separator':''})

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Usage: good.py <date>", file=sys.stderr)
        exit(-1)
    mymain(sys.argv[1])

    1. 圆的定义 https://zh.wikipedia.org/zh-hans/%E5%9C%86
    2. 圆周率 https://zh.wikipedia.org/zh-hans/%E5%9C%93%E5%91%A8%E7%8E%87
    3. 圆的面积 https://zh.wikipedia.org/zh-hans/%E5%9C%86%E7%9A%84%E9%9D%A2%E7%A7%AF
  1. 圆的面积计算
    1. 逼近
      1. 上下界(这个方向研究者主要是优化逼近的速度)
        1. 割圆法
          1. 阿基米德
          2. 威理博·斯涅尔(Cyclometricus,1962年[来源请求])
          3. 惠更斯(De Circuli Magnitudine Inventa,1654年)
      2. 无限逼近
        1. 高斯格点
        2. 数值逼近(最容易操作的方法)
          1. 蒙特卡罗(掷飞镖:如果随机样本一致地散布于一个包含圆的正方形中,样本击中圆的比例趋近于圆和正方形的面积比)
    2. 拼图
      1. 圆分为很大但有限块然后重拼成一个相同面积的正方形
    3. 微积分
      1. 积分&无穷级数展开
  2. 参考文章:
    1. https://zh.m.wikihow.com/%E8%AE%A1%E7%AE%97%E5%9C%86%E5%91%A8%E7%8E%87-Pi
    2. π论 https://web.math.sinica.edu.tw/math_media/d282/28208.pdf