Spark中采用scala将JSON存入ES

Spark中采用scala将JSON存入ES

1.前言

最近一段时间在研究spark、hadoop、elasticsearch。刚学习,还有很多的不懂。经过一天的时间研究。学习了使用scala将json数据存入es中。由于初次学习,所以一个简单的程序,所花费的时间并不少。并且踩过很多的坑。故记录一下。以便于日后不再重蹈覆辙。

2.程序

在进过一系列摸索后,其代码如下所示:

package logDataDoWithimport net.sf.json.JSONObjectimport org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.hadoop.cfg.ConfigurationOptionsimport org.elasticsearch.spark.rdd.EsSparkimport scala.beans.BeanPropertyimport scala.language.postfixOps/**  * Created by 562272115 on 2016/11/15.  *//*log对象*/case class LogDataBean(@BeanProperty var logtime: String, @BeanProperty var mobile: String, @BeanProperty var pkgname: String, @BeanProperty var url: String, @BeanProperty var host: String, @BeanProperty var user_agent: String, @BeanProperty var referer: String, @BeanProperty var res_code: String, @BeanProperty var res_length: Int, @BeanProperty var time_cost: String, @BeanProperty var serial: String, @BeanProperty var appname: String, @BeanProperty var year: Int, @BeanProperty var month: Int, @BeanProperty var day: Int, @BeanProperty var hour: Int, @BeanProperty var minute: Int, @BeanProperty var second: Int,@BeanProperty var rn: String)object LogDataToEs {  def main(args: Array[String]): Unit = {    //文件路径    val filePath = "E:/data/sample.txt"    //创建本地的spark配置,此处为本机测试,若在spark集群中需要设置集群    val conf = new SparkConf().setAppName("local").setMaster("local")    conf.set("es.index.auto.create","true")    conf.set(ConfigurationOptions.ES_NODES,"127.0.0.1")//节点信息    val scLocal = new SparkContext(conf)    //读取文件信息    val file = scLocal.textFile(filePath).map(operLineRdd(_))    println("------------------------------------------------")    EsSpark.saveJsonToEs(file,"zenisoft_case/wifi")    println("------------------------------------------------")    scLocal.stop()  }  def operLineRdd(line: String): String = {    val values = line.split(",", 16)//split函数的使用。此处为将line按照“,”切分,详细的split函数请参照scala的split函数    val log = LogDataBean(values(0),values(1), values(2),values(3), values(4), values(5),values(6),values(7),values(8).toInt,values(9),values(10),values(11),values(0).substring(0,4).toInt,values(0).substring(4,6).toInt,values(0).substring(6,8).toInt,values(0).substring(8,10).toInt,values(0).substring(10,12).toInt,values(0).substring(12,14).toInt,values(15))    JSONObject.fromObject(log).toString  }}

3.注意
1、主构造方法设置时,需要添加BeanProperty属性。否则在转换成json时,无法通过反射获取到相应的值。
2、conf.set(ConfigurationOptions.ES_NODES,"127.0.0.1"),此处不需要带端口。端口设置有专门的属性设置。设置函数为conf.set(ConfigurationOptions.ES_PORT,"9200")。默认端口为9200。
3、注意maven中spark版本和elasticearch的版本搭配,在搭配不一致的情况下,会造成版本冲突。导致程序运行不成功。此处我的maven文件如下所示。并且注意scala版本。否则会出现编译问题。具体配置信息如下。请详细参照。我之前使用的spark版本为2.0.0,scala版本为2.11.8出现版本的冲突问题。在解决版本冲突问题后,由于scala的版本过高,出现了编译问题。在一番修改后终于完成了。
4.未知原因,在刚开始的时候,我使用test的前缀时(EsSpark.saveJsonToEs(file,"testLog/test"))出现不合法的信息操作。至于原因未知,后面修改后,才得以成功。待日后探索。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>logDataDoWith</groupId>  <artifactId>logDataDoWith</artifactId>  <version>1.0-SNAPSHOT</version>  <inceptionYear>2008</inceptionYear>  <properties>    <scala.version>2.10.5</scala.version>  </properties>  <repositories>    <repository>      <id>mvnrepository</id>      <name>mvnrepository</name>      <url>http://mvnrepository.com/</url>    </repository>    <repository>      <id>scala-tools.org</id>      <name>Scala-Tools Maven2 Repository</name>      <url>http://repo1.maven.org/maven2</url>    </repository>    <repository>      <id>www.sonatype.org_nexus</id>      <name>www.sonatype.org_nexus</name>      <url>http://www.sonatype.org/nexus/</url>    </repository>    <repository>      <id>Akka repository</id>      <url>http://repo.akka.io/releases</url>    </repository>    <repository>      <id>cloudera</id>      <url>https://repository.cloudera.com/artifactory/cloudera-repos/.</url>    </repository>    <repository>      <id>jboss</id>      <url>http://repository.jboss.org/nexus/content/groups/public-jboss</url>    </repository>    <repository>      <id>Sonatype snapshots</id>      <url>http://oss.sonatype.org/content/repositories/snapshots/</url>    </repository>  </repositories>  <pluginRepositories>    <pluginRepository>      <id>scala-tools.org</id>      <name>Scala-Tools Maven2 Repository</name>      <url>http://scala-tools.org/repo-releases</url>    </pluginRepository>  </pluginRepositories>  <build>    <sourceDirectory>src/main/scala</sourceDirectory>    <testSourceDirectory>src/test/scala</testSourceDirectory>    <plugins>      <plugin>        <groupId>org.scala-tools</groupId>        <artifactId>maven-scala-plugin</artifactId>        <executions>          <execution>            <goals>              <goal>compile</goal>              <goal>testCompile</goal>            </goals>          </execution>        </executions>        <configuration>          <scalaVersion>${scala.version}</scalaVersion>          <args>            <arg>-target:jvm-1.5</arg>          </args>        </configuration>      </plugin>      <plugin>        <groupId>org.apache.maven.plugins</groupId>        <artifactId>maven-eclipse-plugin</artifactId>        <configuration>          <downloadSources>true</downloadSources>          <buildcommands>            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>          </buildcommands>          <additionalProjectnatures>            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>          </additionalProjectnatures>          <classpathContainers>            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>          </classpathContainers>        </configuration>      </plugin>      <plugin>        <groupId>org.scala-tools</groupId>        <artifactId>maven-scala-plugin</artifactId>        <executions>          <execution>            <goals>              <goal>compile</goal>              <goal>testCompile</goal>            </goals>          </execution>        </executions>        <configuration>          <scalaVersion>${scala.version}</scalaVersion>        </configuration>      </plugin>    </plugins>  </build>  <reporting>    <plugins>      <plugin>        <groupId>org.scala-tools</groupId>        <artifactId>maven-scala-plugin</artifactId>        <configuration>          <scalaVersion>${scala.version}</scalaVersion>        </configuration>      </plugin>    </plugins>  </reporting>  <!--依赖-->  <dependencies>    <dependency>      <groupId>org.scala-lang</groupId>      <artifactId>scala-library</artifactId>      <version>${scala.version}</version>    </dependency>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>4.4</version>      <scope>test</scope>    </dependency>    <dependency>      <groupId>org.specs</groupId>      <artifactId>specs</artifactId>      <version>1.2.5</version>      <scope>test</scope>    </dependency>    <dependency>      <groupId>org.apache.spark</groupId>      <artifactId>spark-core_2.10</artifactId>      <version>1.6.1</version>    </dependency>    <dependency>      <groupId>net.sf.json-lib</groupId>      <artifactId>json-lib</artifactId>      <version>2.4</version>      <classifier>jdk15</classifier>    </dependency>    <dependency>      <groupId>net.sf.ezmorph</groupId>      <artifactId>ezmorph</artifactId>      <version>1.0.6</version>    </dependency>    <dependency>      <groupId>commons-logging</groupId>      <artifactId>commons-logging</artifactId>      <version>1.1.1</version>    </dependency>    <dependency>      <groupId>commons-lang</groupId>      <artifactId>commons-lang</artifactId>      <version>2.5</version>    </dependency>    <dependency>      <groupId>commons-collections</groupId>      <artifactId>commons-collections</artifactId>      <version>3.2.1</version>    </dependency>    <!-- <dependency>         <groupId>commons-beanutils</groupId>         <artifactId>commons-beanutils</artifactId>         <version>1.9.2</version>     </dependency>-->      <dependency>          <groupId>org.elasticsearch</groupId>          <artifactId>elasticsearch-hadoop</artifactId>          <version>2.4.0</version>      </dependency>    <dependency>      <groupId>com.googlecode.json-simple</groupId>      <artifactId>json-simple</artifactId>      <version>1.1</version>    </dependency>    <dependency>      <groupId>org.json</groupId>      <artifactId>json</artifactId>      <version>20160810</version>    </dependency>  </dependencies></project>

免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部