spark调用HTTP请求并返回数据落地到数仓(or数据库)

发布时间 2023-06-07 08:03:04作者: kyrie2333

POM
xml复制代码
com.alibaba
fastjson

    <!-->spark-sql依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.tools.version}</artifactId>
    </dependency>

    <!--spark-sql整合hive -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_${scala.tools.version}</artifactId>
    </dependency>

HTTP工具类
scala复制代码def getRequest(url: String): String = {

val httpClient = HttpClientBuilder.create().build()
val httpGet = new HttpGet(url)
var response: CloseableHttpResponse = null
var request = ""
try {
  response = httpClient.execute(httpGet)
  val entity = response.getEntity
  //      EntityUtils.
  request = EntityUtils.toString(entity)
} catch {
  case ex: Exception => {
    ex.printStackTrace()
  }
} finally {
  try {
    if (httpClient != null) httpClient.close()
    if (response != null) response.close()
  } catch {
    case e: IOException =>
      e.printStackTrace()
  }
}
request

}

def postRequest(url: String): String = {
val httpClient = HttpClientBuilder.create().build()
val httpPost = new HttpPost(url)
httpPost.setHeader("Content-Type", "application/json;charset=utf8")

var response: CloseableHttpResponse = null
var request: String = ""

try {
  response = httpClient.execute(httpPost)
  val entity = response.getEntity
  request = EntityUtils.toString(entity)
} catch {
  case ex: Exception => {
    ex.printStackTrace()
  }
} finally {
  try {
    if (httpClient != null) httpClient.close()
    if (response != null) response.close()
  } catch {
    case e: IOException =>
      e.printStackTrace()
  }
}
request

}

def postRequest(url: String,bodyData: String): String = {
val httpClient = HttpClientBuilder.create().build()
val httpPost = new HttpPost(url)
httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
//设置超时时间
httpPost.setConfig(RequestConfig
.custom()
.setConnectTimeout(5000)
.setConnectionRequestTimeout(1000)
.setSocketTimeout(5000)
.build())

val entity = new StringEntity(bodyData,"utf-8")
entity.setChunked(false)
httpPost.setEntity(entity)
var response: CloseableHttpResponse = null
var request: String = ""

try {
  response = httpClient.execute(httpPost)
  request = EntityUtils.toString(response.getEntity,"UTF-8")
} finally {
  try {
    httpClient.close()
    response.close()
  } catch {
    case e: IOException =>
      e.printStackTrace()
  }
}
request

}

Demo
自己去建一个CaseClass类
scala复制代码case class PropertyManagementSystemCaseClass(column :String,
column :String,
column :String,
column :String,
column :String,
column :String,
column :String,
column :String,
column :String,
column :String,
)

SCALA复制代码object PropertyManagementSystem extends SparkBase{
val date = "20210610"

def executor(spark: SparkSession,data : util.List[PropertyManagementSystemCaseClass]): Unit ={
import spark.implicits._
val ds: Dataset[PropertyManagementSystemCaseClass] = spark.createDataset(data)

ds
  .withColumn("dt",lit(this.date))
  .write
  .mode(SaveMode.Overwrite)
  .insertInto("ods.yszt_grid_collection_rate")

}

def main(args: Array[String]): Unit = {
val spark = getContext(this.getClass.getName)

val requestJson = HttpUtils.getRequest(s"http://127.0.0.1/XXXX/XXXXX?date=$date")
val requestData = JSON.parseObject(requestJson).getString("resultData")
val caseClassData = JSON.parseArray(requestData, classOf[PropertyManagementSystemCaseClass])
executor(spark,caseClassData)
spark.stop()

}
}

作者:XinXing
链接:https://juejin.cn/post/6973855224742035469
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。