longport/quote/
store.rs

1use std::collections::HashMap;
2
3use longport_candlesticks::{Days, UpdateAction, UpdateFields};
4use longport_proto::quote::Period;
5
6use crate::{
7    Market,
8    quote::{
9        Brokers, Candlestick, Depth, PushBrokers, PushDepth, PushEvent, PushTrades, SecurityBoard,
10        Trade, TradeDirection, TradeSession, TradeSessions,
11        push_types::{PushEventDetail, PushQuote},
12    },
13};
14
15const MAX_TRADES: usize = 500;
16const MAX_CANDLESTICKS: usize = 500;
17
18macro_rules! merge_decimal {
19    ($prev:expr, $new:expr, $field:ident) => {
20        if !$new.$field.is_zero() {
21            $new.$field
22        } else {
23            $prev.$field
24        }
25    };
26}
27
28macro_rules! merge_i64 {
29    ($prev:expr, $new:expr, $field:ident) => {
30        if $new.$field != 0 {
31            $new.$field
32        } else {
33            $prev.$field
34        }
35    };
36}
37
38#[derive(Debug)]
39pub(crate) struct TailCandlestick {
40    pub(crate) index: usize,
41    pub(crate) candlestick: Candlestick,
42}
43
44#[derive(Debug)]
45pub(crate) struct Candlesticks {
46    pub(crate) trade_sessions: TradeSessions,
47    pub(crate) candlesticks: Vec<Candlestick>,
48    pub(crate) tails: HashMap<TradeSession, TailCandlestick>,
49}
50
51impl Candlesticks {
52    #[inline]
53    fn merge_input(&self, ts: TradeSession) -> Option<Candlestick> {
54        self.tails.get(&ts).map(|tail| tail.candlestick)
55    }
56
57    pub(crate) fn insert_candlestick_by_time(&mut self, candlestick: Candlestick) -> usize {
58        let timestamp = candlestick.timestamp;
59        match self
60            .candlesticks
61            .binary_search_by_key(&timestamp, |c| c.timestamp)
62        {
63            Ok(index) => {
64                self.candlesticks[index] = candlestick;
65                index
66            }
67            Err(index) => {
68                self.candlesticks.insert(index, candlestick);
69                index
70            }
71        }
72    }
73
74    pub(crate) fn merge_trade<H>(
75        &mut self,
76        market_type: Market,
77        half_days: H,
78        board: SecurityBoard,
79        period: Period,
80        trade: &Trade,
81    ) -> UpdateAction<Candlestick>
82    where
83        H: Days,
84    {
85        let Some(market) = get_market(market_type, board) else {
86            return UpdateAction::None;
87        };
88        let ts = trade.trade_session;
89        let period = convert_period(period);
90
91        let trade_type = trade.trade_type.as_str();
92        let update_fields = match market_type {
93            Market::Unknown => unreachable!(),
94            Market::HK => match trade_type {
95                "" => UpdateFields::all(),
96                "D" => UpdateFields::VOLUME,
97                "M" => UpdateFields::VOLUME,
98                "P" => UpdateFields::VOLUME,
99                "U" => UpdateFields::all(),
100                "X" => UpdateFields::VOLUME,
101                "Y" => UpdateFields::VOLUME,
102                _ => UpdateFields::empty(),
103            },
104            Market::US => match trade_type {
105                "" => UpdateFields::all(),
106                "A" => UpdateFields::all(),
107                "B" => UpdateFields::all(),
108                "C" => UpdateFields::VOLUME,
109                "D" => UpdateFields::all(),
110                "E" => UpdateFields::all(),
111                "F" => UpdateFields::all(),
112                "G" => UpdateFields::VOLUME,
113                "H" => UpdateFields::VOLUME,
114                "I" if board == SecurityBoard::USOption || board == SecurityBoard::USOptionS => {
115                    UpdateFields::all()
116                }
117                "I" if ts == TradeSession::Pre || ts == TradeSession::Post => UpdateFields::all(),
118                "I" => UpdateFields::VOLUME,
119                "K" => UpdateFields::all(),
120                "M" => UpdateFields::empty(),
121                "P" => UpdateFields::empty(),
122                "S" => UpdateFields::all(),
123                "V" => UpdateFields::VOLUME,
124                "W" => UpdateFields::VOLUME,
125                "X" => UpdateFields::all(),
126                "1" => UpdateFields::all(),
127                _ => UpdateFields::empty(),
128            },
129            Market::CN | Market::SG | Market::Crypto => UpdateFields::all(),
130        };
131
132        market.merge_trade(
133            half_days,
134            period,
135            self.merge_input(ts),
136            trade,
137            update_fields,
138        )
139    }
140
141    pub(crate) fn merge_quote_day(
142        &mut self,
143        market_type: Market,
144        board: SecurityBoard,
145        push_quote: &PushQuote,
146    ) -> UpdateAction<Candlestick> {
147        let Some(market) = get_market(market_type, board) else {
148            return UpdateAction::None;
149        };
150        let ts = push_quote.trade_session;
151        market.merge_quote_day(self.merge_input(ts), push_quote)
152    }
153
154    pub(crate) fn merge_quote<H>(
155        &mut self,
156        market_type: Market,
157        half_days: H,
158        board: SecurityBoard,
159        period: Period,
160        push_quote: &PushQuote,
161    ) -> UpdateAction<Candlestick>
162    where
163        H: Days,
164    {
165        let Some(market) = get_market(market_type, board) else {
166            return UpdateAction::None;
167        };
168        let ts = push_quote.trade_session;
169        let period = convert_period(period);
170        market.merge_trade(
171            half_days,
172            period,
173            self.merge_input(ts),
174            &Trade {
175                price: push_quote.last_done,
176                volume: push_quote.current_volume,
177                timestamp: push_quote.timestamp,
178                trade_type: String::new(),
179                direction: TradeDirection::Neutral,
180                trade_session: push_quote.trade_session,
181            },
182            UpdateFields::all(),
183        )
184    }
185
186    pub(crate) fn check_and_remove(&mut self) {
187        if self.candlesticks.len() <= MAX_CANDLESTICKS * 2 {
188            return;
189        }
190
191        let remove_count = self.candlesticks.len() - MAX_CANDLESTICKS;
192        let mut remove_tails = vec![];
193
194        for (ts, tail) in &mut self.tails {
195            if tail.index >= remove_count {
196                tail.index -= remove_count;
197            } else {
198                remove_tails.push(*ts);
199            }
200        }
201
202        for ts in remove_tails {
203            self.tails.remove(&ts);
204        }
205
206        self.candlesticks.drain(..remove_count);
207    }
208}
209
210#[derive(Debug, Default)]
211pub(crate) struct SecuritiesData {
212    pub(crate) quote: PushQuote,
213
214    pub(crate) asks: Vec<Depth>,
215    pub(crate) bids: Vec<Depth>,
216
217    pub(crate) ask_brokers: Vec<Brokers>,
218    pub(crate) bid_brokers: Vec<Brokers>,
219
220    pub(crate) trades: Vec<Trade>,
221
222    pub(crate) board: SecurityBoard,
223    pub(crate) candlesticks: HashMap<Period, Candlesticks>,
224}
225
226#[derive(Debug, Default)]
227pub(crate) struct Store {
228    pub(crate) securities: HashMap<String, SecuritiesData>,
229}
230
231impl Store {
232    pub(crate) fn handle_push(&mut self, event: &mut PushEvent) {
233        let data = self.securities.entry(event.symbol.clone()).or_default();
234
235        match &mut event.detail {
236            PushEventDetail::Quote(quote) => merge_quote(data, quote),
237            PushEventDetail::Depth(depth) => merge_depth(data, depth),
238            PushEventDetail::Brokers(brokers) => merge_brokers(data, brokers),
239            PushEventDetail::Trade(trade) => merge_trades(data, trade),
240            PushEventDetail::Candlestick(_) => unreachable!(),
241        }
242    }
243}
244
245fn merge_quote(data: &mut SecuritiesData, quote: &mut PushQuote) {
246    let prev_quote = &data.quote;
247    let new_quote = PushQuote {
248        last_done: merge_decimal!(prev_quote, quote, last_done),
249        open: merge_decimal!(prev_quote, quote, open),
250        high: merge_decimal!(prev_quote, quote, high),
251        low: merge_decimal!(prev_quote, quote, low),
252        timestamp: quote.timestamp,
253        volume: merge_i64!(prev_quote, quote, volume),
254        turnover: merge_decimal!(prev_quote, quote, turnover),
255        trade_status: quote.trade_status,
256        trade_session: quote.trade_session,
257        current_volume: quote.current_volume,
258        current_turnover: quote.current_turnover,
259    };
260    data.quote = new_quote.clone();
261    *quote = new_quote;
262}
263
264fn merge_depth(data: &mut SecuritiesData, depth: &PushDepth) {
265    replace(&mut data.asks, depth.asks.clone(), |v| v.position);
266    replace(&mut data.bids, depth.bids.clone(), |v| v.position);
267}
268
269fn merge_brokers(data: &mut SecuritiesData, brokers: &PushBrokers) {
270    replace(&mut data.ask_brokers, brokers.ask_brokers.clone(), |v| {
271        v.position
272    });
273    replace(&mut data.bid_brokers, brokers.bid_brokers.clone(), |v| {
274        v.position
275    });
276}
277
278fn merge_trades(data: &mut SecuritiesData, trades: &PushTrades) {
279    data.trades.extend(trades.trades.clone());
280    if data.trades.len() > MAX_TRADES * 2 {
281        data.trades.drain(..MAX_TRADES);
282    }
283}
284
285fn replace<T, B, F>(elements: &mut Vec<T>, others: Vec<T>, f: F)
286where
287    F: Fn(&T) -> B,
288    B: Ord,
289{
290    for v in others {
291        match elements.binary_search_by_key(&f(&v), &f) {
292            Ok(index) => elements[index] = v,
293            Err(index) => elements.insert(index, v),
294        }
295    }
296}
297
298pub(crate) fn get_market(
299    market: Market,
300    board: SecurityBoard,
301) -> Option<&'static longport_candlesticks::Market> {
302    use longport_candlesticks::markets::*;
303
304    Some(match market {
305        Market::US if board == SecurityBoard::USOptionS => &US_OPTION,
306        Market::US => &US,
307        Market::HK => &HK,
308        Market::SG => &SG,
309        Market::CN => &CN,
310        Market::Unknown | Market::Crypto => return None,
311    })
312}
313
314fn convert_period(period: Period) -> longport_candlesticks::Period {
315    use longport_candlesticks::Period::*;
316
317    match period {
318        Period::UnknownPeriod => unreachable!(),
319        Period::OneMinute => Min_1,
320        Period::TwoMinute => Min_2,
321        Period::ThreeMinute => Min_3,
322        Period::FiveMinute => Min_5,
323        Period::TenMinute => Min_10,
324        Period::FifteenMinute => Min_15,
325        Period::TwentyMinute => Min_20,
326        Period::ThirtyMinute => Min_30,
327        Period::FortyFiveMinute => Min_45,
328        Period::SixtyMinute => Min_60,
329        Period::TwoHour => Min_120,
330        Period::ThreeHour => Min_180,
331        Period::FourHour => Min_240,
332        Period::Day => Day,
333        Period::Week => Week,
334        Period::Month => Month,
335        Period::Quarter => Quarter,
336        Period::Year => Year,
337    }
338}