1use std::collections::HashMap;
2
3use longport_candlesticks::{
4 Days, TRADE_SESSION_POST, TRADE_SESSION_PRE, TradeSessionType, UpdateAction, UpdateFields,
5};
6use longport_proto::quote::Period;
7
8use crate::{
9 Market,
10 quote::{
11 Brokers, Candlestick, Depth, PushBrokers, PushDepth, PushEvent, PushTrades, SecurityBoard,
12 Trade, TradeSessions,
13 push_types::{PushEventDetail, PushQuote},
14 },
15};
16
17const MAX_TRADES: usize = 500;
18const MAX_CANDLESTICKS: usize = 500;
19
20macro_rules! merge_decimal {
21 ($prev:expr, $new:expr, $field:ident) => {
22 if !$new.$field.is_zero() {
23 $new.$field
24 } else {
25 $prev.$field
26 }
27 };
28}
29
30macro_rules! merge_i64 {
31 ($prev:expr, $new:expr, $field:ident) => {
32 if $new.$field != 0 {
33 $new.$field
34 } else {
35 $prev.$field
36 }
37 };
38}
39
40#[derive(Debug)]
41pub(crate) struct TailCandlestick {
42 pub(crate) index: usize,
43 pub(crate) candlestick: Candlestick,
44}
45
46#[derive(Debug)]
47pub(crate) struct Candlesticks {
48 pub(crate) trade_sessions: TradeSessions,
49 pub(crate) candlesticks: Vec<Candlestick>,
50 pub(crate) tails: HashMap<TradeSessionType, TailCandlestick>,
51}
52
53impl Candlesticks {
54 #[inline]
55 fn merge_input(&self, ts: TradeSessionType) -> Option<longport_candlesticks::Candlestick> {
56 self.tails.get(&ts).map(|tail| tail.candlestick.into())
57 }
58
59 pub(crate) fn insert_candlestick_by_time(&mut self, candlestick: Candlestick) -> usize {
60 let timestamp = candlestick.timestamp;
61 match self
62 .candlesticks
63 .binary_search_by_key(×tamp, |c| c.timestamp)
64 {
65 Ok(index) => {
66 self.candlesticks[index] = candlestick;
67 index
68 }
69 Err(index) => {
70 self.candlesticks.insert(index, candlestick);
71 index
72 }
73 }
74 }
75
76 pub(crate) fn merge_trade<H>(
77 &mut self,
78 ts: TradeSessionType,
79 market_type: Market,
80 half_days: H,
81 board: SecurityBoard,
82 period: Period,
83 trade: &Trade,
84 ) -> UpdateAction
85 where
86 H: Days,
87 {
88 let Some(market) = get_market(market_type, board) else {
89 return UpdateAction::None;
90 };
91 let period = convert_period(period);
92
93 let trade_type = trade.trade_type.as_str();
94 let update_fields = match market_type {
95 Market::Unknown => unreachable!(),
96 Market::HK => match trade_type {
97 "" => UpdateFields::all(),
98 "D" => UpdateFields::VOLUME,
99 "M" => UpdateFields::VOLUME,
100 "P" => UpdateFields::VOLUME,
101 "U" => UpdateFields::all(),
102 "X" => UpdateFields::VOLUME,
103 "Y" => UpdateFields::VOLUME,
104 _ => UpdateFields::empty(),
105 },
106 Market::US => match trade_type {
107 "" => UpdateFields::all(),
108 "A" => UpdateFields::all(),
109 "B" => UpdateFields::all(),
110 "C" => UpdateFields::VOLUME,
111 "D" => UpdateFields::all(),
112 "E" => UpdateFields::all(),
113 "F" => UpdateFields::all(),
114 "G" => UpdateFields::VOLUME,
115 "H" => UpdateFields::VOLUME,
116 "I" if board == SecurityBoard::USOption || board == SecurityBoard::USOptionS => {
117 UpdateFields::all()
118 }
119 "I" if ts == TRADE_SESSION_PRE || ts == TRADE_SESSION_POST => UpdateFields::all(),
120 "I" => UpdateFields::VOLUME,
121 "K" => UpdateFields::all(),
122 "M" => UpdateFields::empty(),
123 "P" => UpdateFields::empty(),
124 "S" => UpdateFields::all(),
125 "V" => UpdateFields::VOLUME,
126 "W" => UpdateFields::VOLUME,
127 "X" => UpdateFields::all(),
128 "1" => UpdateFields::all(),
129 _ => UpdateFields::empty(),
130 },
131 Market::CN | Market::SG => UpdateFields::all(),
132 };
133
134 market.merge_trade(
135 ts,
136 half_days,
137 period,
138 self.merge_input(ts),
139 longport_candlesticks::Trade {
140 time: trade.timestamp,
141 price: trade.price,
142 volume: trade.volume,
143 update_fields,
144 },
145 )
146 }
147
148 pub(crate) fn merge_quote<H>(
149 &mut self,
150 ts: TradeSessionType,
151 market_type: Market,
152 half_days: H,
153 board: SecurityBoard,
154 period: Period,
155 push_quote: &PushQuote,
156 ) -> UpdateAction
157 where
158 H: Days,
159 {
160 debug_assert!(period == Period::Day);
161
162 let Some(market) = get_market(market_type, board) else {
163 return UpdateAction::None;
164 };
165 let period = convert_period(period);
166
167 market.merge_quote(
168 ts,
169 half_days,
170 period,
171 self.merge_input(ts),
172 longport_candlesticks::Quote {
173 time: push_quote.timestamp,
174 open: push_quote.open,
175 high: push_quote.high,
176 low: push_quote.low,
177 last_done: push_quote.last_done,
178 volume: push_quote.volume,
179 turnover: push_quote.turnover,
180 },
181 )
182 }
183
184 pub(crate) fn check_and_remove(&mut self) {
185 if self.candlesticks.len() <= MAX_CANDLESTICKS * 2 {
186 return;
187 }
188
189 let remove_count = self.candlesticks.len() - MAX_CANDLESTICKS;
190 let mut remove_tails = vec![];
191
192 for (ts, tail) in &mut self.tails {
193 if tail.index >= remove_count {
194 tail.index -= remove_count;
195 } else {
196 remove_tails.push(*ts);
197 }
198 }
199
200 for ts in remove_tails {
201 self.tails.remove(&ts);
202 }
203
204 self.candlesticks.drain(..remove_count);
205 }
206}
207
208#[derive(Debug, Default)]
209pub(crate) struct SecuritiesData {
210 pub(crate) quote: PushQuote,
211
212 pub(crate) asks: Vec<Depth>,
213 pub(crate) bids: Vec<Depth>,
214
215 pub(crate) ask_brokers: Vec<Brokers>,
216 pub(crate) bid_brokers: Vec<Brokers>,
217
218 pub(crate) trades: Vec<Trade>,
219
220 pub(crate) board: SecurityBoard,
221 pub(crate) candlesticks: HashMap<Period, Candlesticks>,
222}
223
224#[derive(Debug, Default)]
225pub(crate) struct Store {
226 pub(crate) securities: HashMap<String, SecuritiesData>,
227}
228
229impl Store {
230 pub(crate) fn handle_push(&mut self, event: &mut PushEvent) {
231 let data = self.securities.entry(event.symbol.clone()).or_default();
232
233 match &mut event.detail {
234 PushEventDetail::Quote(quote) => merge_quote(data, quote),
235 PushEventDetail::Depth(depth) => merge_depth(data, depth),
236 PushEventDetail::Brokers(brokers) => merge_brokers(data, brokers),
237 PushEventDetail::Trade(trade) => merge_trades(data, trade),
238 PushEventDetail::Candlestick(_) => unreachable!(),
239 }
240 }
241}
242
243fn merge_quote(data: &mut SecuritiesData, quote: &mut PushQuote) {
244 let prev_quote = &data.quote;
245 let new_quote = PushQuote {
246 last_done: merge_decimal!(prev_quote, quote, last_done),
247 open: merge_decimal!(prev_quote, quote, open),
248 high: merge_decimal!(prev_quote, quote, high),
249 low: merge_decimal!(prev_quote, quote, low),
250 timestamp: quote.timestamp,
251 volume: merge_i64!(prev_quote, quote, volume),
252 turnover: merge_decimal!(prev_quote, quote, turnover),
253 trade_status: quote.trade_status,
254 trade_session: quote.trade_session,
255 current_volume: quote.current_volume,
256 current_turnover: quote.current_turnover,
257 };
258 data.quote = new_quote.clone();
259 *quote = new_quote;
260}
261
262fn merge_depth(data: &mut SecuritiesData, depth: &PushDepth) {
263 replace(&mut data.asks, depth.asks.clone(), |v| v.position);
264 replace(&mut data.bids, depth.bids.clone(), |v| v.position);
265}
266
267fn merge_brokers(data: &mut SecuritiesData, brokers: &PushBrokers) {
268 replace(&mut data.ask_brokers, brokers.ask_brokers.clone(), |v| {
269 v.position
270 });
271 replace(&mut data.bid_brokers, brokers.bid_brokers.clone(), |v| {
272 v.position
273 });
274}
275
276fn merge_trades(data: &mut SecuritiesData, trades: &PushTrades) {
277 data.trades.extend(trades.trades.clone());
278 if data.trades.len() > MAX_TRADES * 2 {
279 data.trades.drain(..MAX_TRADES);
280 }
281}
282
283fn replace<T, B, F>(elements: &mut Vec<T>, others: Vec<T>, f: F)
284where
285 F: Fn(&T) -> B,
286 B: Ord,
287{
288 for v in others {
289 match elements.binary_search_by_key(&f(&v), &f) {
290 Ok(index) => elements[index] = v,
291 Err(index) => elements.insert(index, v),
292 }
293 }
294}
295
296pub(crate) fn get_market(
297 market: Market,
298 board: SecurityBoard,
299) -> Option<&'static longport_candlesticks::Market> {
300 use longport_candlesticks::markets::*;
301
302 Some(match market {
303 Market::US if board == SecurityBoard::USOptionS => &US_OPTION,
304 Market::US => &US,
305 Market::HK => &HK,
306 Market::SG => &SG,
307 Market::CN => &CN,
308 Market::Unknown => return None,
309 })
310}
311
312fn convert_period(period: Period) -> longport_candlesticks::Period {
313 use longport_candlesticks::Period::*;
314
315 match period {
316 Period::UnknownPeriod => unreachable!(),
317 Period::OneMinute => Min_1,
318 Period::TwoMinute => Min_2,
319 Period::ThreeMinute => Min_3,
320 Period::FiveMinute => Min_5,
321 Period::TenMinute => Min_10,
322 Period::FifteenMinute => Min_15,
323 Period::TwentyMinute => Min_20,
324 Period::ThirtyMinute => Min_30,
325 Period::FortyFiveMinute => Min_45,
326 Period::SixtyMinute => Min_60,
327 Period::TwoHour => Min_120,
328 Period::ThreeHour => Min_180,
329 Period::FourHour => Min_240,
330 Period::Day => Day,
331 Period::Week => Week,
332 Period::Month => Month,
333 Period::Quarter => Quarter,
334 Period::Year => Year,
335 }
336}