133 lines
4.2 KiB
Python
133 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql.functions import col, year,count, to_date, from_unixtime, unix_timestamp
|
|
|
|
|
|
def initialize_spark():
|
|
"""初始化SparkSession"""
|
|
try:
|
|
spark = SparkSession.builder \
|
|
.appName("FinancialAnalysis") \
|
|
.config("spark.jars", "/path/to/mysql-connector-java-8.0.23.jar") \
|
|
.enableHiveSupport() \
|
|
.config("hive.metastore.uris", "thrift://192.168.229.122:9083") \
|
|
.getOrCreate()
|
|
|
|
return spark
|
|
except Exception as e:
|
|
raise
|
|
|
|
|
|
def analyze_bank_grade(spark, mysql_url, mysql_properties):
|
|
"""分析企业信用等级分布"""
|
|
dim_bank_grade = spark.sql("""
|
|
SELECT
|
|
bank_grade AS name,
|
|
COUNT(*) AS value
|
|
FROM
|
|
tb_banks
|
|
GROUP BY
|
|
bank_grade
|
|
""")
|
|
dim_bank_grade.write \
|
|
.jdbc(mysql_url, "dim_bank_grade", mode="overwrite", properties=mysql_properties)
|
|
|
|
|
|
def analyze_loan_status(spark, mysql_url, mysql_properties):
|
|
"""分析还款状态"""
|
|
|
|
dim_loan_status = spark.sql("""
|
|
SELECT
|
|
loan_status AS name,
|
|
COUNT(*) AS value
|
|
FROM
|
|
tb_loans
|
|
GROUP BY
|
|
loan_status
|
|
""")
|
|
|
|
dim_loan_status.write \
|
|
.jdbc(mysql_url, "dim_loan_status", mode="overwrite", properties=mysql_properties)
|
|
|
|
|
|
def analyze_loan_amount(spark, mysql_url, mysql_properties):
|
|
"""分析贷款金额变化趋势"""
|
|
dim_loan_amount = spark.sql("""
|
|
SELECT
|
|
loan_date AS date,
|
|
ROUND(SUM(loan_amount) / 10000, 2) AS amount
|
|
FROM
|
|
tb_loans
|
|
GROUP BY
|
|
loan_date
|
|
""")
|
|
|
|
dim_loan_amount.write \
|
|
.jdbc(mysql_url, "dim_loan_amount", mode="overwrite", properties=mysql_properties)
|
|
|
|
def analyze_repayment_status(spark, mysql_url, mysql_properties):
|
|
"""分析企业还款状态"""
|
|
dim_repayment_status = spark.sql("""
|
|
SELECT
|
|
repayment_date,
|
|
ROUND(SUM(CASE WHEN repayment_status = '按时还款' THEN repayment_amount ELSE 0 END), 2) AS on_time_amount,
|
|
ROUND(SUM(CASE WHEN repayment_status = '逾期还款' THEN repayment_amount ELSE 0 END), 2) AS overdue_amount,
|
|
ROUND(SUM(repayment_amount), 2) AS total_amount
|
|
FROM
|
|
tb_repayments
|
|
WHERE
|
|
repayment_status IN ('按时还款', '逾期还款')
|
|
GROUP BY
|
|
repayment_date
|
|
ORDER BY
|
|
repayment_date
|
|
""")
|
|
|
|
dim_repayment_status.write \
|
|
.jdbc(mysql_url, "dim_repayment_status", mode="overwrite", properties=mysql_properties)
|
|
|
|
def analyze_jie_count(spark, mysql_url, mysql_properties):
|
|
"""分析每年借款数量"""
|
|
tb_loans_df = spark.table("tb_loans")
|
|
dim_jie_count = tb_loans_df.withColumn(
|
|
"loan_date_parsed",
|
|
to_date(from_unixtime(unix_timestamp(col("loan_date"), "yyyy/MM/dd")))
|
|
).groupBy(
|
|
year("loan_date_parsed").alias("name")
|
|
).agg(
|
|
count("*").alias("value")
|
|
).orderBy("name")
|
|
|
|
dim_jie_count.write \
|
|
.jdbc(mysql_url, "dim_jie_count", mode="overwrite", properties=mysql_properties)
|
|
|
|
|
|
def main():
|
|
try:
|
|
# 初始化Spark
|
|
spark = initialize_spark()
|
|
|
|
# MySQL连接配置
|
|
mysql_properties = {
|
|
"user": "root",
|
|
"password": "123456",
|
|
"driver": "com.mysql.jdbc.Driver"
|
|
}
|
|
mysql_url = "jdbc:mysql://192.168.229.122:3306/bigdata_ibecs?useSSL=false"
|
|
# 执行各项分析任务
|
|
analyze_bank_grade(spark, mysql_url, mysql_properties)
|
|
analyze_loan_status(spark, mysql_url, mysql_properties)
|
|
analyze_loan_amount(spark, mysql_url, mysql_properties)
|
|
analyze_repayment_status(spark, mysql_url, mysql_properties)
|
|
analyze_jie_count(spark, mysql_url, mysql_properties)
|
|
finally:
|
|
# 关闭SparkSession
|
|
if 'spark' in locals():
|
|
spark.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|