pyspark相关配置

  • 时间:
  • 来源:互联网

hadoop(配置文件都在$HADOOP_HOME/etc/hadoop)

  1. hadoop.env.sh
    #export JAVA_HOME=${JAVA_HOME}
    export JAVA_HOME=/opt/modules/jdk1.8.0_11
    
    #配置java_home
  2. core-site.xml
    <configuration>
    	<property>
    		<name>fs.default.name</name>
    		<value>hdfs://pyspark-1.bigload.com:8020</value>
    	</property>
    </configuration>

     

  3. hdfs-site.xml
    <configuration>
    	<property>
    		<name>dfs.namenode.name.dir</name>
    		<value>/opt/modules/hadoop-2.6.0-cdh5.7.0/tmp/dfs/name</value>
    	</property>
    	<property>
    		<name>dfs.datanode.data.dir</name>
    		<value>/opt/modules/hadoop-2.6.0-cdh5.7.0/tmp/dfs/data</value>
    	</property>
    	
    	<!--副本数-->
    	<property>
    		<name>dfs.replication</name>
    		<value>1</value>
    	</property>
    </configuration>

     

  4. mapred-site.xml(cp mapred-site.xml.template mapred-site.xml)
    <configuration>
    	<property>
    		<name>mapreduce.framework.name</name>
    		<value>yarn</value>
    	</property>
    </configuration>

     

  5. yarn-site.xml
    <configuration>
    	<property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>
    </configuration>
    

     

  6. 格式化
    bin/hadoop namenode –format
    

     

  7. 无秘钥登录
    ssh无秘钥登录
    cd ~/.ssh
    主节点 NameNode
    1)生成一对公钥与秘钥
    ssh-keygen -t  rsa
    2)拷贝公钥到各个机器上
    ssh-copy-id   pyspark-1.bigload.com 
    ssh-copy-id localhost
    ssh-copy-id 0.0.0.0
    
    

     

  8. 启动hdfs
    sbin/start-dfs.sh

     

  9. 游览器查看(http://pyspark-1.bigload.com:50070),传个文件
    bin/hadoop dfs -mkdir -p /test
    bin/hadoop fs -put README.txt /test

     

  10. 启动yarn
    sbin/start-yarn.sh
  11. 游览器查看(http://pyspark-1.bigload.com:8088/)

maven

  1. 解压
    tar -zxf apache-maven-3.3.9-bin.tar.gz -C /opt/modules/

     

  2. 建立local/repo文件夹,并且修改config/settings.xml
    创建local/repo文件夹
    
        mkdir -p local/repo
    
    
    settings.xml
    
        <localRepository>/opt/modules/apache-maven-3.3.9/local/repo</localRepository>
    
    
        并添加镜像:
        <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

python

yum源(阿里云)http://www.cnblogs.com/lin1/p/5607121.html

yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel gcc

tar -zxf Python-3.6.9.tgz
cd Python-3.6.9
./configure --prefix=/opt/modules/python3/
cd /opt/modules/python3
make && make install

 

spark

解压

修改为对应的版本(dev/make-distribution.sh)
初始
VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null | grep -v "INFO" | tail -n 1)
SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 2>/dev/null\
| grep -v "INFO"\
| tail -n 1)
SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\
| grep -v "INFO"\
| tail -n 1)
SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ 2>/dev/null\
| grep -v "INFO"\
| fgrep --count "<id>hive</id>";\
# Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# because we use "set -o pipefail"
echo -n)
替换为下面对应的参数值
VERSION=2.4.4
SCALA_VERSION=2.11.12
SPARK_HADOOP_VERSION=2.6.0-cdh5.7.0
SPARK_HIVE=1

e.spark pom.xml 添加 cdh reponsitory
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>

如果不添加会出现如下错误信息:
Failed to execute goal on project spark-launcher_2.11: Could not resolve dependencies for project org.apache.spark:spark-launcher_2.11:jar:2.1.0: Could not find artifact org.apache.hadoop:hadoop-client:jar:2.5.0-cdh5.3.6

[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <goals> -rf :spark-launcher_2.11
-rf :spark-launcher_2.11

./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0

编译时间会有些长,会生成一个spark-2.4.4-bin-2.6.0-cdh5.7.0.tgz
tar -zxf spark-2.4.4-bin-2.6.0-cdh5.7.0.tgz -C /opt/modules
随后进入编译后的文件夹
cd /opt/modules/spark-2.4.4-bin-2.6.0-cdh5.7.0
可以用bin/spark-shell测试一下

配置SPARK默认使用 PYTHON3
在 bin/pyspark 文件中添加 export PYSPARK_PYTHON=python3
(python3已经加入环境变量)

http://www.bubuko.com/infodetail-2294107.html

官网:http://spark.apache.org/

源码:https://github.com/apache/spark

spark core核心rdd

什么是rdd,弹性分布式数据集(Resilient Distributed Dataset )

1)rdd是一个抽象类

2)带泛型的,可以支持多种类型:String,User,Person

rdd特性

    不可变

   分区

   并行计算的

1.A list of partitions

RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,一般会遵循数据的本地性(一般一个hdfs里的block会加载为一个partition)。

2.A function for computing each split

一个函数计算每一个分片,RDD的每个partition上面都会有function,也就是函数应用,其作用是实现RDD之间partition的转换。

3.A list of dependencies on other RDDs

RDD会记录它的依赖 ,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。为了容错(重算,cache,checkpoint),也就是说在内存中的RDD操作时出错或丢失会进行重算

rdd1==>rdd2==>rdd3==>rdd4

rdd2=f1(rdd1)

rdd3=f2(rdd2)

rdd4=f3(rdd3)

 

4.Optionally,a Partitioner for Key-value RDDs

  可选项,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面

5.Optionally, a list of preferred locations to compute each split on

最优的位置去计算,也就是数据的本地性。(数据在哪优先吧作业调度到数据所在的节点进行计算:移动数据不如移动计算)

图解rdd

sparkcontext & sparkconf

Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName参数是您的应用程序显示在集群UI上的名称。 master是Spark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。

rdd创建方式

1.并行集合

>>> data=[1,2,3,4,5]
>>> distData=sc.parallelize(data)
>>> distData
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
>>> data
[1, 2, 3, 4, 5]
>>> disData.collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'disData' is not defined
>>> distData.collect()
[1, 2, 3, 4, 5]
>>> distData.reduce(lambda a,b:a+b)
15                                       

2.外部数据集

 

PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。

可以使用SparkContexttextFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://s3a://等URI),并读取其作为行的集合。这是一个示例调用:

>>> distFile = sc.textFile("data.txt")

一旦创建,distFile就可以通过数据集操作对其进行操作。例如,我们可以使用mapreduce操作将所有行的大小相加,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

关于使用Spark读取文件的一些注意事项:

  • 如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制到所有工作服务器,或者使用网络安装的共享文件系统。

  • Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更大数量的分区。请注意,分区不能少于块。

除文本文件外,Spark的Python API还支持其他几种数据格式:

  • SparkContext.wholeTextFiles使您可以读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名,内容)对返回。与相比textFile,会在每个文件的每一行返回一条记录。

  • RDD.saveAsPickleFileSparkContext.pickleFile支持以包含腌制Python对象的简单格式保存RDD。批处理用于咸菜序列化,默认批处理大小为10。

  • SequenceFile和Hadoop输入/输出格式

请注意,此功能当前已标记Experimental,仅供高级用户使用。将来可能会替换为基于Spark SQL的读/写支持,在这种情况下,Spark SQL是首选方法。

sc.textFile("file:///opt/datas/README.md").collect()
sc.textFile("hdfs://pyspark-1.bigload.com/hadoop/README.md").collect()

spark应用程序开发及运行

我是把一切环境都是装载了虚拟机上,发现在Windows还需要再做一次环境绝望之余看见了能远程连接,换上自己的虚拟机的ip就行了,反正是怎么省事怎么来,日后要是出现问题了再改吧,感谢分享

Windows上的PyCharm 远程连接调试pyspark


from pyspark import SparkConf,SparkContext
#创建sparkconf:设置的是spark相关的参数信息
conf=SparkConf().setMaster("local[2]").setAppName("spark0301")
#创建sparkcontext
sc=SparkContext(conf=conf)

#业务逻辑
data=[1,3,44,5]
distData=sc.parallelize(data)
print(distData.collect())

sc.stop()

 

Laozizuiku
发布了14 篇原创文章 · 获赞 6 · 访问量 4314
私信 关注

本文链接http://element-ui.cn/news/show-941.aspx