longport/quote/
store.rs

1use std::collections::HashMap;
2
3use longport_candlesticks::{
4    Days, TRADE_SESSION_POST, TRADE_SESSION_PRE, TradeSessionType, UpdateAction, UpdateFields,
5};
6use longport_proto::quote::Period;
7
8use crate::{
9    Market,
10    quote::{
11        Brokers, Candlestick, Depth, PushBrokers, PushDepth, PushEvent, PushTrades, SecurityBoard,
12        Trade, TradeSessions,
13        push_types::{PushEventDetail, PushQuote},
14    },
15};
16
17const MAX_TRADES: usize = 500;
18const MAX_CANDLESTICKS: usize = 500;
19
20macro_rules! merge_decimal {
21    ($prev:expr, $new:expr, $field:ident) => {
22        if !$new.$field.is_zero() {
23            $new.$field
24        } else {
25            $prev.$field
26        }
27    };
28}
29
30macro_rules! merge_i64 {
31    ($prev:expr, $new:expr, $field:ident) => {
32        if $new.$field != 0 {
33            $new.$field
34        } else {
35            $prev.$field
36        }
37    };
38}
39
40#[derive(Debug)]
41pub(crate) struct TailCandlestick {
42    pub(crate) index: usize,
43    pub(crate) candlestick: Candlestick,
44}
45
46#[derive(Debug)]
47pub(crate) struct Candlesticks {
48    pub(crate) trade_sessions: TradeSessions,
49    pub(crate) candlesticks: Vec<Candlestick>,
50    pub(crate) tails: HashMap<TradeSessionType, TailCandlestick>,
51}
52
53impl Candlesticks {
54    #[inline]
55    fn merge_input(&self, ts: TradeSessionType) -> Option<longport_candlesticks::Candlestick> {
56        self.tails.get(&ts).map(|tail| tail.candlestick.into())
57    }
58
59    pub(crate) fn insert_candlestick_by_time(&mut self, candlestick: Candlestick) -> usize {
60        let timestamp = candlestick.timestamp;
61        match self
62            .candlesticks
63            .binary_search_by_key(&timestamp, |c| c.timestamp)
64        {
65            Ok(index) => {
66                self.candlesticks[index] = candlestick;
67                index
68            }
69            Err(index) => {
70                self.candlesticks.insert(index, candlestick);
71                index
72            }
73        }
74    }
75
76    pub(crate) fn merge_trade<H>(
77        &mut self,
78        ts: TradeSessionType,
79        market_type: Market,
80        half_days: H,
81        board: SecurityBoard,
82        period: Period,
83        trade: &Trade,
84    ) -> UpdateAction
85    where
86        H: Days,
87    {
88        let Some(market) = get_market(market_type, board) else {
89            return UpdateAction::None;
90        };
91        let period = convert_period(period);
92
93        let trade_type = trade.trade_type.as_str();
94        let update_fields = match market_type {
95            Market::Unknown => unreachable!(),
96            Market::HK => match trade_type {
97                "" => UpdateFields::all(),
98                "D" => UpdateFields::VOLUME,
99                "M" => UpdateFields::VOLUME,
100                "P" => UpdateFields::VOLUME,
101                "U" => UpdateFields::all(),
102                "X" => UpdateFields::VOLUME,
103                "Y" => UpdateFields::VOLUME,
104                _ => UpdateFields::empty(),
105            },
106            Market::US => match trade_type {
107                "" => UpdateFields::all(),
108                "A" => UpdateFields::all(),
109                "B" => UpdateFields::all(),
110                "C" => UpdateFields::VOLUME,
111                "D" => UpdateFields::all(),
112                "E" => UpdateFields::all(),
113                "F" => UpdateFields::all(),
114                "G" => UpdateFields::VOLUME,
115                "H" => UpdateFields::VOLUME,
116                "I" if board == SecurityBoard::USOption || board == SecurityBoard::USOptionS => {
117                    UpdateFields::all()
118                }
119                "I" if ts == TRADE_SESSION_PRE || ts == TRADE_SESSION_POST => UpdateFields::all(),
120                "I" => UpdateFields::VOLUME,
121                "K" => UpdateFields::all(),
122                "M" => UpdateFields::empty(),
123                "P" => UpdateFields::empty(),
124                "S" => UpdateFields::all(),
125                "V" => UpdateFields::VOLUME,
126                "W" => UpdateFields::VOLUME,
127                "X" => UpdateFields::all(),
128                "1" => UpdateFields::all(),
129                _ => UpdateFields::empty(),
130            },
131            Market::CN | Market::SG => UpdateFields::all(),
132        };
133
134        market.merge_trade(
135            ts,
136            half_days,
137            period,
138            self.merge_input(ts),
139            longport_candlesticks::Trade {
140                time: trade.timestamp,
141                price: trade.price,
142                volume: trade.volume,
143                update_fields,
144            },
145        )
146    }
147
148    pub(crate) fn merge_quote<H>(
149        &mut self,
150        ts: TradeSessionType,
151        market_type: Market,
152        half_days: H,
153        board: SecurityBoard,
154        period: Period,
155        push_quote: &PushQuote,
156    ) -> UpdateAction
157    where
158        H: Days,
159    {
160        debug_assert!(period == Period::Day);
161
162        let Some(market) = get_market(market_type, board) else {
163            return UpdateAction::None;
164        };
165        let period = convert_period(period);
166
167        market.merge_quote(
168            ts,
169            half_days,
170            period,
171            self.merge_input(ts),
172            longport_candlesticks::Quote {
173                time: push_quote.timestamp,
174                open: push_quote.open,
175                high: push_quote.high,
176                low: push_quote.low,
177                last_done: push_quote.last_done,
178                volume: push_quote.volume,
179                turnover: push_quote.turnover,
180            },
181        )
182    }
183
184    pub(crate) fn check_and_remove(&mut self) {
185        if self.candlesticks.len() <= MAX_CANDLESTICKS * 2 {
186            return;
187        }
188
189        let remove_count = self.candlesticks.len() - MAX_CANDLESTICKS;
190        let mut remove_tails = vec![];
191
192        for (ts, tail) in &mut self.tails {
193            if tail.index >= remove_count {
194                tail.index -= remove_count;
195            } else {
196                remove_tails.push(*ts);
197            }
198        }
199
200        for ts in remove_tails {
201            self.tails.remove(&ts);
202        }
203
204        self.candlesticks.drain(..remove_count);
205    }
206}
207
208#[derive(Debug, Default)]
209pub(crate) struct SecuritiesData {
210    pub(crate) quote: PushQuote,
211
212    pub(crate) asks: Vec<Depth>,
213    pub(crate) bids: Vec<Depth>,
214
215    pub(crate) ask_brokers: Vec<Brokers>,
216    pub(crate) bid_brokers: Vec<Brokers>,
217
218    pub(crate) trades: Vec<Trade>,
219
220    pub(crate) board: SecurityBoard,
221    pub(crate) candlesticks: HashMap<Period, Candlesticks>,
222}
223
224#[derive(Debug, Default)]
225pub(crate) struct Store {
226    pub(crate) securities: HashMap<String, SecuritiesData>,
227}
228
229impl Store {
230    pub(crate) fn handle_push(&mut self, event: &mut PushEvent) {
231        let data = self.securities.entry(event.symbol.clone()).or_default();
232
233        match &mut event.detail {
234            PushEventDetail::Quote(quote) => merge_quote(data, quote),
235            PushEventDetail::Depth(depth) => merge_depth(data, depth),
236            PushEventDetail::Brokers(brokers) => merge_brokers(data, brokers),
237            PushEventDetail::Trade(trade) => merge_trades(data, trade),
238            PushEventDetail::Candlestick(_) => unreachable!(),
239        }
240    }
241}
242
243fn merge_quote(data: &mut SecuritiesData, quote: &mut PushQuote) {
244    let prev_quote = &data.quote;
245    let new_quote = PushQuote {
246        last_done: merge_decimal!(prev_quote, quote, last_done),
247        open: merge_decimal!(prev_quote, quote, open),
248        high: merge_decimal!(prev_quote, quote, high),
249        low: merge_decimal!(prev_quote, quote, low),
250        timestamp: quote.timestamp,
251        volume: merge_i64!(prev_quote, quote, volume),
252        turnover: merge_decimal!(prev_quote, quote, turnover),
253        trade_status: quote.trade_status,
254        trade_session: quote.trade_session,
255        current_volume: quote.current_volume,
256        current_turnover: quote.current_turnover,
257    };
258    data.quote = new_quote.clone();
259    *quote = new_quote;
260}
261
262fn merge_depth(data: &mut SecuritiesData, depth: &PushDepth) {
263    replace(&mut data.asks, depth.asks.clone(), |v| v.position);
264    replace(&mut data.bids, depth.bids.clone(), |v| v.position);
265}
266
267fn merge_brokers(data: &mut SecuritiesData, brokers: &PushBrokers) {
268    replace(&mut data.ask_brokers, brokers.ask_brokers.clone(), |v| {
269        v.position
270    });
271    replace(&mut data.bid_brokers, brokers.bid_brokers.clone(), |v| {
272        v.position
273    });
274}
275
276fn merge_trades(data: &mut SecuritiesData, trades: &PushTrades) {
277    data.trades.extend(trades.trades.clone());
278    if data.trades.len() > MAX_TRADES * 2 {
279        data.trades.drain(..MAX_TRADES);
280    }
281}
282
283fn replace<T, B, F>(elements: &mut Vec<T>, others: Vec<T>, f: F)
284where
285    F: Fn(&T) -> B,
286    B: Ord,
287{
288    for v in others {
289        match elements.binary_search_by_key(&f(&v), &f) {
290            Ok(index) => elements[index] = v,
291            Err(index) => elements.insert(index, v),
292        }
293    }
294}
295
296pub(crate) fn get_market(
297    market: Market,
298    board: SecurityBoard,
299) -> Option<&'static longport_candlesticks::Market> {
300    use longport_candlesticks::markets::*;
301
302    Some(match market {
303        Market::US if board == SecurityBoard::USOptionS => &US_OPTION,
304        Market::US => &US,
305        Market::HK => &HK,
306        Market::SG => &SG,
307        Market::CN => &CN,
308        Market::Unknown => return None,
309    })
310}
311
312fn convert_period(period: Period) -> longport_candlesticks::Period {
313    use longport_candlesticks::Period::*;
314
315    match period {
316        Period::UnknownPeriod => unreachable!(),
317        Period::OneMinute => Min_1,
318        Period::TwoMinute => Min_2,
319        Period::ThreeMinute => Min_3,
320        Period::FiveMinute => Min_5,
321        Period::TenMinute => Min_10,
322        Period::FifteenMinute => Min_15,
323        Period::TwentyMinute => Min_20,
324        Period::ThirtyMinute => Min_30,
325        Period::FortyFiveMinute => Min_45,
326        Period::SixtyMinute => Min_60,
327        Period::TwoHour => Min_120,
328        Period::ThreeHour => Min_180,
329        Period::FourHour => Min_240,
330        Period::Day => Day,
331        Period::Week => Week,
332        Period::Month => Month,
333        Period::Quarter => Quarter,
334        Period::Year => Year,
335    }
336}