java批量写入hive

5/12/2025 hivejava

resources目录下不需要使用hadoop和hive的配置文件

package com.exam;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class HiveBatchImport {

    // Hive JDBC连接配置
    private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
    private static final String URL = "jdbc:hive2://master:10000/default";
    private static final String USER = "root";
    private static final String PASSWORD = "";

    public static void main(String[] args) {
        Connection connection = null;
        Statement statement = null;

        try {
            // 1. 加载Hive JDBC驱动
            Class.forName(DRIVER_NAME);

            // 2. 建立Hive连接
            connection = DriverManager.getConnection(URL, USER, PASSWORD);

            // 3. 禁用自动统计收集
            try (Statement stmt = connection.createStatement()) {
                stmt.execute("set hive.stats.autogather=false");
            }

            // 4. 创建Statement对象
            statement = connection.createStatement();

            // 5. 从CSV读取数据
            List<User> users = readDataFromCsv("data/data.csv");

            // 6. 批量插入数据
            int batchSize = 1000; // 每批次的SQL语句数量
            int count = 0;
            StringBuilder sqlBatch = new StringBuilder();
            for (User user : users) {
                if (sqlBatch.length() == 0) {
                    sqlBatch.append("INSERT INTO TABLE t1 (user_id, birthday, gender) VALUES ");
                }
                // 构建INSERT语句
                sqlBatch.append("(")
                        .append(user.getUserId()).append(", '")
                        .append(escapeSql(user.getBirthday())).append("', ")
                        .append(user.getGender()).append("),");

                // 达到批次大小时执行
                if (++count % batchSize == 0) {
                    sqlBatch.deleteCharAt(sqlBatch.length() - 1);
                    statement.execute(sqlBatch.toString());
                    sqlBatch.setLength(0); // 清空缓冲区
                    System.out.println("已处理 " + count + " 条记录");
                }
            }

            // 执行剩余批次
            if (sqlBatch.length() > 0) {
                sqlBatch.deleteCharAt(sqlBatch.length() - 1);
                statement.execute(sqlBatch.toString());
            }

            System.out.println("成功导入 " + count + " 条数据到Hive表");

        } catch (Exception e) {
            e.printStackTrace();
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        } finally {
            // 7. 关闭资源
            try {
                if (statement != null) {
                    statement.close();
                }
                if (connection != null) {
                    connection.setAutoCommit(true); // 恢复自动提交状态
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    // 从CSV文件读取数据
    private static List<User> readDataFromCsv(String filePath) throws IOException {
        List<User> users = new ArrayList<>();
        try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
            String line;
            boolean isHeader = true;

            while ((line = br.readLine()) != null) {
                if (isHeader) {
                    isHeader = false;
                    continue; // 跳过标题行
                }

                String[] parts = line.trim().split(",");
                if (parts.length >= 3) { // 确保数据有足够的列
                    try {
                        long userId = Long.parseLong(parts[0]);
                        String birthday = parts[1];
                        int gender = Integer.parseInt(parts[2]);
                        users.add(new User(userId, birthday, gender));
                    } catch (NumberFormatException e) {
                        System.err.println("解析数据行时出错: " + line);
                        e.printStackTrace();
                    }
                }
            }
        }
        return users;
    }

    // 转义SQL字符串中的特殊字符
    private static String escapeSql(String input) {
        if (input == null) {
            return null;
        }
        return input.replace("'", "''");
    }

    static class User {
        private final Long userId;
        private final String birthday;
        private final int gender;

        public User(Long userId, String birthday, int gender) {
            this.userId = userId;
            this.birthday = birthday;
            this.gender = gender;
        }

        // getters
        public Long getUserId() {
            return userId;
        }

        public String getBirthday() {
            return birthday;
        }

        public int getGender() {
            return gender;
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159

pom.xml


        <!-- Hive JDBC 驱动 -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-slf4j-impl</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Hadoop 公共库 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.classic.version}</version>
        </dependency>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Last Updated: 5/12/2025, 3:58:13 PM