1use std::collections::HashMap;
2
3use longport_candlesticks::{Days, UpdateAction, UpdateFields};
4use longport_proto::quote::Period;
5
6use crate::{
7 Market,
8 quote::{
9 Brokers, Candlestick, Depth, PushBrokers, PushDepth, PushEvent, PushTrades, SecurityBoard,
10 Trade, TradeDirection, TradeSession, TradeSessions,
11 push_types::{PushEventDetail, PushQuote},
12 },
13};
14
15const MAX_TRADES: usize = 500;
16const MAX_CANDLESTICKS: usize = 500;
17
18macro_rules! merge_decimal {
19 ($prev:expr, $new:expr, $field:ident) => {
20 if !$new.$field.is_zero() {
21 $new.$field
22 } else {
23 $prev.$field
24 }
25 };
26}
27
28macro_rules! merge_i64 {
29 ($prev:expr, $new:expr, $field:ident) => {
30 if $new.$field != 0 {
31 $new.$field
32 } else {
33 $prev.$field
34 }
35 };
36}
37
38#[derive(Debug)]
39pub(crate) struct TailCandlestick {
40 pub(crate) index: usize,
41 pub(crate) candlestick: Candlestick,
42}
43
44#[derive(Debug)]
45pub(crate) struct Candlesticks {
46 pub(crate) trade_sessions: TradeSessions,
47 pub(crate) candlesticks: Vec<Candlestick>,
48 pub(crate) tails: HashMap<TradeSession, TailCandlestick>,
49}
50
51impl Candlesticks {
52 #[inline]
53 fn merge_input(&self, ts: TradeSession) -> Option<Candlestick> {
54 self.tails.get(&ts).map(|tail| tail.candlestick)
55 }
56
57 pub(crate) fn insert_candlestick_by_time(&mut self, candlestick: Candlestick) -> usize {
58 let timestamp = candlestick.timestamp;
59 match self
60 .candlesticks
61 .binary_search_by_key(×tamp, |c| c.timestamp)
62 {
63 Ok(index) => {
64 self.candlesticks[index] = candlestick;
65 index
66 }
67 Err(index) => {
68 self.candlesticks.insert(index, candlestick);
69 index
70 }
71 }
72 }
73
74 pub(crate) fn merge_trade<H>(
75 &mut self,
76 market_type: Market,
77 half_days: H,
78 board: SecurityBoard,
79 period: Period,
80 trade: &Trade,
81 ) -> UpdateAction<Candlestick>
82 where
83 H: Days,
84 {
85 let Some(market) = get_market(market_type, board) else {
86 return UpdateAction::None;
87 };
88 let ts = trade.trade_session;
89 let period = convert_period(period);
90
91 let trade_type = trade.trade_type.as_str();
92 let update_fields = match market_type {
93 Market::Unknown => unreachable!(),
94 Market::HK => match trade_type {
95 "" => UpdateFields::all(),
96 "D" => UpdateFields::VOLUME,
97 "M" => UpdateFields::VOLUME,
98 "P" => UpdateFields::VOLUME,
99 "U" => UpdateFields::all(),
100 "X" => UpdateFields::VOLUME,
101 "Y" => UpdateFields::VOLUME,
102 _ => UpdateFields::empty(),
103 },
104 Market::US => match trade_type {
105 "" => UpdateFields::all(),
106 "A" => UpdateFields::all(),
107 "B" => UpdateFields::all(),
108 "C" => UpdateFields::VOLUME,
109 "D" => UpdateFields::all(),
110 "E" => UpdateFields::all(),
111 "F" => UpdateFields::all(),
112 "G" => UpdateFields::VOLUME,
113 "H" => UpdateFields::VOLUME,
114 "I" if board == SecurityBoard::USOption || board == SecurityBoard::USOptionS => {
115 UpdateFields::all()
116 }
117 "I" if ts == TradeSession::Pre || ts == TradeSession::Post => UpdateFields::all(),
118 "I" => UpdateFields::VOLUME,
119 "K" => UpdateFields::all(),
120 "M" => UpdateFields::empty(),
121 "P" => UpdateFields::empty(),
122 "S" => UpdateFields::all(),
123 "V" => UpdateFields::VOLUME,
124 "W" => UpdateFields::VOLUME,
125 "X" => UpdateFields::all(),
126 "1" => UpdateFields::all(),
127 _ => UpdateFields::empty(),
128 },
129 Market::CN | Market::SG | Market::Crypto => UpdateFields::all(),
130 };
131
132 market.merge_trade(
133 half_days,
134 period,
135 self.merge_input(ts),
136 trade,
137 update_fields,
138 )
139 }
140
141 pub(crate) fn merge_quote_day(
142 &mut self,
143 market_type: Market,
144 board: SecurityBoard,
145 push_quote: &PushQuote,
146 ) -> UpdateAction<Candlestick> {
147 let Some(market) = get_market(market_type, board) else {
148 return UpdateAction::None;
149 };
150 let ts = push_quote.trade_session;
151 market.merge_quote_day(self.merge_input(ts), push_quote)
152 }
153
154 pub(crate) fn merge_quote<H>(
155 &mut self,
156 market_type: Market,
157 half_days: H,
158 board: SecurityBoard,
159 period: Period,
160 push_quote: &PushQuote,
161 ) -> UpdateAction<Candlestick>
162 where
163 H: Days,
164 {
165 let Some(market) = get_market(market_type, board) else {
166 return UpdateAction::None;
167 };
168 let ts = push_quote.trade_session;
169 let period = convert_period(period);
170 market.merge_trade(
171 half_days,
172 period,
173 self.merge_input(ts),
174 &Trade {
175 price: push_quote.last_done,
176 volume: push_quote.current_volume,
177 timestamp: push_quote.timestamp,
178 trade_type: String::new(),
179 direction: TradeDirection::Neutral,
180 trade_session: push_quote.trade_session,
181 },
182 UpdateFields::all(),
183 )
184 }
185
186 pub(crate) fn check_and_remove(&mut self) {
187 if self.candlesticks.len() <= MAX_CANDLESTICKS * 2 {
188 return;
189 }
190
191 let remove_count = self.candlesticks.len() - MAX_CANDLESTICKS;
192 let mut remove_tails = vec![];
193
194 for (ts, tail) in &mut self.tails {
195 if tail.index >= remove_count {
196 tail.index -= remove_count;
197 } else {
198 remove_tails.push(*ts);
199 }
200 }
201
202 for ts in remove_tails {
203 self.tails.remove(&ts);
204 }
205
206 self.candlesticks.drain(..remove_count);
207 }
208}
209
210#[derive(Debug, Default)]
211pub(crate) struct SecuritiesData {
212 pub(crate) quote: PushQuote,
213
214 pub(crate) asks: Vec<Depth>,
215 pub(crate) bids: Vec<Depth>,
216
217 pub(crate) ask_brokers: Vec<Brokers>,
218 pub(crate) bid_brokers: Vec<Brokers>,
219
220 pub(crate) trades: Vec<Trade>,
221
222 pub(crate) board: SecurityBoard,
223 pub(crate) candlesticks: HashMap<Period, Candlesticks>,
224}
225
226#[derive(Debug, Default)]
227pub(crate) struct Store {
228 pub(crate) securities: HashMap<String, SecuritiesData>,
229}
230
231impl Store {
232 pub(crate) fn handle_push(&mut self, event: &mut PushEvent) {
233 let data = self.securities.entry(event.symbol.clone()).or_default();
234
235 match &mut event.detail {
236 PushEventDetail::Quote(quote) => merge_quote(data, quote),
237 PushEventDetail::Depth(depth) => merge_depth(data, depth),
238 PushEventDetail::Brokers(brokers) => merge_brokers(data, brokers),
239 PushEventDetail::Trade(trade) => merge_trades(data, trade),
240 PushEventDetail::Candlestick(_) => unreachable!(),
241 }
242 }
243}
244
245fn merge_quote(data: &mut SecuritiesData, quote: &mut PushQuote) {
246 let prev_quote = &data.quote;
247 let new_quote = PushQuote {
248 last_done: merge_decimal!(prev_quote, quote, last_done),
249 open: merge_decimal!(prev_quote, quote, open),
250 high: merge_decimal!(prev_quote, quote, high),
251 low: merge_decimal!(prev_quote, quote, low),
252 timestamp: quote.timestamp,
253 volume: merge_i64!(prev_quote, quote, volume),
254 turnover: merge_decimal!(prev_quote, quote, turnover),
255 trade_status: quote.trade_status,
256 trade_session: quote.trade_session,
257 current_volume: quote.current_volume,
258 current_turnover: quote.current_turnover,
259 };
260 data.quote = new_quote.clone();
261 *quote = new_quote;
262}
263
264fn merge_depth(data: &mut SecuritiesData, depth: &PushDepth) {
265 replace(&mut data.asks, depth.asks.clone(), |v| v.position);
266 replace(&mut data.bids, depth.bids.clone(), |v| v.position);
267}
268
269fn merge_brokers(data: &mut SecuritiesData, brokers: &PushBrokers) {
270 replace(&mut data.ask_brokers, brokers.ask_brokers.clone(), |v| {
271 v.position
272 });
273 replace(&mut data.bid_brokers, brokers.bid_brokers.clone(), |v| {
274 v.position
275 });
276}
277
278fn merge_trades(data: &mut SecuritiesData, trades: &PushTrades) {
279 data.trades.extend(trades.trades.clone());
280 if data.trades.len() > MAX_TRADES * 2 {
281 data.trades.drain(..MAX_TRADES);
282 }
283}
284
285fn replace<T, B, F>(elements: &mut Vec<T>, others: Vec<T>, f: F)
286where
287 F: Fn(&T) -> B,
288 B: Ord,
289{
290 for v in others {
291 match elements.binary_search_by_key(&f(&v), &f) {
292 Ok(index) => elements[index] = v,
293 Err(index) => elements.insert(index, v),
294 }
295 }
296}
297
298pub(crate) fn get_market(
299 market: Market,
300 board: SecurityBoard,
301) -> Option<&'static longport_candlesticks::Market> {
302 use longport_candlesticks::markets::*;
303
304 Some(match market {
305 Market::US if board == SecurityBoard::USOptionS => &US_OPTION,
306 Market::US => &US,
307 Market::HK => &HK,
308 Market::SG => &SG,
309 Market::CN => &CN,
310 Market::Unknown | Market::Crypto => return None,
311 })
312}
313
314fn convert_period(period: Period) -> longport_candlesticks::Period {
315 use longport_candlesticks::Period::*;
316
317 match period {
318 Period::UnknownPeriod => unreachable!(),
319 Period::OneMinute => Min_1,
320 Period::TwoMinute => Min_2,
321 Period::ThreeMinute => Min_3,
322 Period::FiveMinute => Min_5,
323 Period::TenMinute => Min_10,
324 Period::FifteenMinute => Min_15,
325 Period::TwentyMinute => Min_20,
326 Period::ThirtyMinute => Min_30,
327 Period::FortyFiveMinute => Min_45,
328 Period::SixtyMinute => Min_60,
329 Period::TwoHour => Min_120,
330 Period::ThreeHour => Min_180,
331 Period::FourHour => Min_240,
332 Period::Day => Day,
333 Period::Week => Week,
334 Period::Month => Month,
335 Period::Quarter => Quarter,
336 Period::Year => Year,
337 }
338}