#!/usr/bin/env python3 # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql.functions import col, year,count, to_date, from_unixtime, unix_timestamp def initialize_spark(): """初始化SparkSession""" try: spark = SparkSession.builder \ .appName("FinancialAnalysis") \ .config("spark.jars", "/path/to/mysql-connector-java-8.0.23.jar") \ .enableHiveSupport() \ .config("hive.metastore.uris", "thrift://192.168.229.122:9083") \ .getOrCreate() return spark except Exception as e: raise def analyze_bank_grade(spark, mysql_url, mysql_properties): """分析企业信用等级分布""" dim_bank_grade = spark.sql(""" SELECT bank_grade AS name, COUNT(*) AS value FROM tb_banks GROUP BY bank_grade """) dim_bank_grade.write \ .jdbc(mysql_url, "dim_bank_grade", mode="overwrite", properties=mysql_properties) def analyze_loan_status(spark, mysql_url, mysql_properties): """分析还款状态""" dim_loan_status = spark.sql(""" SELECT loan_status AS name, COUNT(*) AS value FROM tb_loans GROUP BY loan_status """) dim_loan_status.write \ .jdbc(mysql_url, "dim_loan_status", mode="overwrite", properties=mysql_properties) def analyze_loan_amount(spark, mysql_url, mysql_properties): """分析贷款金额变化趋势""" dim_loan_amount = spark.sql(""" SELECT loan_date AS date, ROUND(SUM(loan_amount) / 10000, 2) AS amount FROM tb_loans GROUP BY loan_date """) dim_loan_amount.write \ .jdbc(mysql_url, "dim_loan_amount", mode="overwrite", properties=mysql_properties) def analyze_repayment_status(spark, mysql_url, mysql_properties): """分析企业还款状态""" dim_repayment_status = spark.sql(""" SELECT repayment_date, ROUND(SUM(CASE WHEN repayment_status = '按时还款' THEN repayment_amount ELSE 0 END), 2) AS on_time_amount, ROUND(SUM(CASE WHEN repayment_status = '逾期还款' THEN repayment_amount ELSE 0 END), 2) AS overdue_amount, ROUND(SUM(repayment_amount), 2) AS total_amount FROM tb_repayments WHERE repayment_status IN ('按时还款', '逾期还款') GROUP BY repayment_date ORDER BY repayment_date """) dim_repayment_status.write \ .jdbc(mysql_url, "dim_repayment_status", mode="overwrite", properties=mysql_properties) def analyze_jie_count(spark, mysql_url, mysql_properties): """分析每年借款数量""" tb_loans_df = spark.table("tb_loans") dim_jie_count = tb_loans_df.withColumn( "loan_date_parsed", to_date(from_unixtime(unix_timestamp(col("loan_date"), "yyyy/MM/dd"))) ).groupBy( year("loan_date_parsed").alias("name") ).agg( count("*").alias("value") ).orderBy("name") dim_jie_count.write \ .jdbc(mysql_url, "dim_jie_count", mode="overwrite", properties=mysql_properties) def main(): try: # 初始化Spark spark = initialize_spark() # MySQL连接配置 mysql_properties = { "user": "root", "password": "123456", "driver": "com.mysql.jdbc.Driver" } mysql_url = "jdbc:mysql://192.168.229.122:3306/bigdata_ibecs?useSSL=false" # 执行各项分析任务 analyze_bank_grade(spark, mysql_url, mysql_properties) analyze_loan_status(spark, mysql_url, mysql_properties) analyze_loan_amount(spark, mysql_url, mysql_properties) analyze_repayment_status(spark, mysql_url, mysql_properties) analyze_jie_count(spark, mysql_url, mysql_properties) finally: # 关闭SparkSession if 'spark' in locals(): spark.stop() if __name__ == "__main__": main()