zhou zhou
9 小时以前 25f0001a7e76d0565fa9de0651f1177b9f61472f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.vincent.rsf.server.manager.partition;
 
import com.vincent.rsf.framework.exception.CoolException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
 
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
 
/**
 * Partition support for ASN history log tables.
 */
@Component
public class AsnLogPartitionSupport {
 
    public static final String ORDER_LOG_TABLE = "man_asn_order_log";
    public static final String ORDER_ITEM_LOG_TABLE = "man_asn_order_item_log";
 
    private static final long CACHE_TTL_MILLIS = TimeUnit.MINUTES.toMillis(5);
    private static final String LIST_TABLE_SQL =
            "select table_name from information_schema.tables " +
                    "where table_schema = database() and (table_name = ? or table_name like ? escape '\\\\')";
 
    private final JdbcTemplate jdbcTemplate;
    private final Map<String, TableCacheEntry> tableCache = new ConcurrentHashMap<>();
 
    public AsnLogPartitionSupport(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
 
    public String resolveRoutedTable(String logicalTable) {
        String routed = AsnLogTableRoutingContext.getTable(logicalTable);
        return routed == null ? logicalTable : routed;
    }
 
    public String resolveOrderLogTable(Date createTime) {
        return resolvePhysicalTable(ORDER_LOG_TABLE, createTime);
    }
 
    public String resolveOrderItemLogTable(Date createTime) {
        return resolvePhysicalTable(ORDER_ITEM_LOG_TABLE, createTime);
    }
 
    public String resolvePhysicalTable(String logicalTable, Date createTime) {
        Date effectiveDate = createTime == null ? new Date() : createTime;
        LocalDateTime localDateTime = LocalDateTime.ofInstant(effectiveDate.toInstant(), ZoneId.systemDefault());
        int half = localDateTime.getMonthValue() <= 6 ? 1 : 2;
        return logicalTable + "_" + localDateTime.getYear() + "_h" + half;
    }
 
    public List<String> listOrderLogTables() {
        return listReadableTables(ORDER_LOG_TABLE);
    }
 
    public List<String> listOrderItemLogTables() {
        return listReadableTables(ORDER_ITEM_LOG_TABLE);
    }
 
    public List<String> listReadableTables(String logicalTable) {
        TableCacheEntry cacheEntry = tableCache.get(logicalTable);
        long now = System.currentTimeMillis();
        if (cacheEntry != null && now - cacheEntry.loadedAt < CACHE_TTL_MILLIS) {
            return cacheEntry.tables;
        }
        return refreshReadableTables(logicalTable);
    }
 
    public void ensureTableExists(String logicalTable, String actualTable) {
        List<String> tables = refreshReadableTables(logicalTable);
        if (!tables.contains(actualTable)) {
            throw new CoolException("历史日志分表不存在,请先创建表:" + actualTable);
        }
    }
 
    public <T> T executeOnTable(String logicalTable, String actualTable, Supplier<T> supplier) {
        return AsnLogTableRoutingContext.withTable(logicalTable, actualTable, supplier);
    }
 
    public void runOnTable(String logicalTable, String actualTable, Runnable runnable) {
        AsnLogTableRoutingContext.withTable(logicalTable, actualTable, runnable);
    }
 
    private List<String> refreshReadableTables(String logicalTable) {
        try {
            List<String> tables = jdbcTemplate.queryForList(
                    LIST_TABLE_SQL,
                    String.class,
                    logicalTable,
                    logicalTable + "\\_%"
            );
            if (tables == null || tables.isEmpty()) {
                tables = new ArrayList<>(Collections.singletonList(logicalTable));
            }
            tables.sort(tableComparator(logicalTable));
            tableCache.put(logicalTable, new TableCacheEntry(Collections.unmodifiableList(new ArrayList<>(tables)), System.currentTimeMillis()));
            return tableCache.get(logicalTable).tables;
        } catch (Exception ex) {
            List<String> fallback = Collections.singletonList(logicalTable);
            tableCache.put(logicalTable, new TableCacheEntry(fallback, System.currentTimeMillis()));
            return fallback;
        }
    }
 
    private Comparator<String> tableComparator(String logicalTable) {
        return (left, right) -> {
            boolean leftBase = logicalTable.equals(left);
            boolean rightBase = logicalTable.equals(right);
            if (leftBase && rightBase) {
                return 0;
            }
            if (leftBase) {
                return 1;
            }
            if (rightBase) {
                return -1;
            }
            return right.compareTo(left);
        };
    }
 
    private static class TableCacheEntry {
        private final List<String> tables;
        private final long loadedAt;
 
        private TableCacheEntry(List<String> tables, long loadedAt) {
            this.tables = tables;
            this.loadedAt = loadedAt;
        }
    }
}