1use std::collections::HashMap;
2
3use longport_candlesticks::{Days, UpdateAction, UpdateFields};
4use longport_proto::quote::Period;
5
6use crate::{
7 Market,
8 quote::{
9 Brokers, Candlestick, Depth, PushBrokers, PushDepth, PushEvent, PushTrades, SecurityBoard,
10 Trade, TradeSession, TradeSessions,
11 push_types::{PushEventDetail, PushQuote},
12 },
13};
14
15const MAX_TRADES: usize = 500;
16const MAX_CANDLESTICKS: usize = 500;
17
18macro_rules! merge_decimal {
19 ($prev:expr, $new:expr, $field:ident) => {
20 if !$new.$field.is_zero() {
21 $new.$field
22 } else {
23 $prev.$field
24 }
25 };
26}
27
28macro_rules! merge_i64 {
29 ($prev:expr, $new:expr, $field:ident) => {
30 if $new.$field != 0 {
31 $new.$field
32 } else {
33 $prev.$field
34 }
35 };
36}
37
38#[derive(Debug)]
39pub(crate) struct TailCandlestick {
40 pub(crate) index: usize,
41 pub(crate) candlestick: Candlestick,
42}
43
44#[derive(Debug)]
45pub(crate) struct Candlesticks {
46 pub(crate) trade_sessions: TradeSessions,
47 pub(crate) candlesticks: Vec<Candlestick>,
48 pub(crate) tails: HashMap<TradeSession, TailCandlestick>,
49}
50
51impl Candlesticks {
52 #[inline]
53 fn merge_input(&self, ts: TradeSession) -> Option<Candlestick> {
54 self.tails.get(&ts).map(|tail| tail.candlestick)
55 }
56
57 pub(crate) fn insert_candlestick_by_time(&mut self, candlestick: Candlestick) -> usize {
58 let timestamp = candlestick.timestamp;
59 match self
60 .candlesticks
61 .binary_search_by_key(×tamp, |c| c.timestamp)
62 {
63 Ok(index) => {
64 self.candlesticks[index] = candlestick;
65 index
66 }
67 Err(index) => {
68 self.candlesticks.insert(index, candlestick);
69 index
70 }
71 }
72 }
73
74 pub(crate) fn merge_trade<H>(
75 &mut self,
76 market_type: Market,
77 half_days: H,
78 board: SecurityBoard,
79 period: Period,
80 trade: &Trade,
81 ) -> UpdateAction<Candlestick>
82 where
83 H: Days,
84 {
85 let Some(market) = get_market(market_type, board) else {
86 return UpdateAction::None;
87 };
88 let ts = trade.trade_session;
89 let period = convert_period(period);
90
91 let trade_type = trade.trade_type.as_str();
92 let update_fields = match market_type {
93 Market::Unknown => unreachable!(),
94 Market::HK => match trade_type {
95 "" => UpdateFields::all(),
96 "D" => UpdateFields::VOLUME,
97 "M" => UpdateFields::VOLUME,
98 "P" => UpdateFields::VOLUME,
99 "U" => UpdateFields::all(),
100 "X" => UpdateFields::VOLUME,
101 "Y" => UpdateFields::VOLUME,
102 _ => UpdateFields::empty(),
103 },
104 Market::US => match trade_type {
105 "" => UpdateFields::all(),
106 "A" => UpdateFields::all(),
107 "B" => UpdateFields::all(),
108 "C" => UpdateFields::VOLUME,
109 "D" => UpdateFields::all(),
110 "E" => UpdateFields::all(),
111 "F" => UpdateFields::all(),
112 "G" => UpdateFields::VOLUME,
113 "H" => UpdateFields::VOLUME,
114 "I" if board == SecurityBoard::USOption || board == SecurityBoard::USOptionS => {
115 UpdateFields::all()
116 }
117 "I" if ts == TradeSession::Pre || ts == TradeSession::Post => UpdateFields::all(),
118 "I" => UpdateFields::VOLUME,
119 "K" => UpdateFields::all(),
120 "M" => UpdateFields::empty(),
121 "P" => UpdateFields::empty(),
122 "S" => UpdateFields::all(),
123 "V" => UpdateFields::VOLUME,
124 "W" => UpdateFields::VOLUME,
125 "X" => UpdateFields::all(),
126 "1" => UpdateFields::all(),
127 _ => UpdateFields::empty(),
128 },
129 Market::CN | Market::SG | Market::Crypto => UpdateFields::all(),
130 };
131
132 market.merge_trade(
133 half_days,
134 period,
135 self.merge_input(ts),
136 trade,
137 update_fields,
138 )
139 }
140
141 pub(crate) fn merge_quote_day(
142 &mut self,
143 market_type: Market,
144 board: SecurityBoard,
145 push_quote: &PushQuote,
146 ) -> UpdateAction<Candlestick> {
147 let Some(market) = get_market(market_type, board) else {
148 return UpdateAction::None;
149 };
150 let ts = push_quote.trade_session;
151 market.merge_quote_day(self.merge_input(ts), push_quote)
152 }
153
154 pub(crate) fn merge_quote<H>(
155 &mut self,
156 market_type: Market,
157 half_days: H,
158 board: SecurityBoard,
159 period: Period,
160 push_quote: &PushQuote,
161 ) -> UpdateAction<Candlestick>
162 where
163 H: Days,
164 {
165 let Some(market) = get_market(market_type, board) else {
166 return UpdateAction::None;
167 };
168 let ts = push_quote.trade_session;
169 let period = convert_period(period);
170 market.merge_trade(
171 half_days,
172 period,
173 self.merge_input(ts),
174 push_quote,
175 UpdateFields::all(),
176 )
177 }
178
179 pub(crate) fn check_and_remove(&mut self) {
180 if self.candlesticks.len() <= MAX_CANDLESTICKS * 2 {
181 return;
182 }
183
184 let remove_count = self.candlesticks.len() - MAX_CANDLESTICKS;
185 let mut remove_tails = vec![];
186
187 for (ts, tail) in &mut self.tails {
188 if tail.index >= remove_count {
189 tail.index -= remove_count;
190 } else {
191 remove_tails.push(*ts);
192 }
193 }
194
195 for ts in remove_tails {
196 self.tails.remove(&ts);
197 }
198
199 self.candlesticks.drain(..remove_count);
200 }
201}
202
203#[derive(Debug, Default)]
204pub(crate) struct SecuritiesData {
205 pub(crate) quote: PushQuote,
206
207 pub(crate) asks: Vec<Depth>,
208 pub(crate) bids: Vec<Depth>,
209
210 pub(crate) ask_brokers: Vec<Brokers>,
211 pub(crate) bid_brokers: Vec<Brokers>,
212
213 pub(crate) trades: Vec<Trade>,
214
215 pub(crate) board: SecurityBoard,
216 pub(crate) candlesticks: HashMap<Period, Candlesticks>,
217}
218
219#[derive(Debug, Default)]
220pub(crate) struct Store {
221 pub(crate) securities: HashMap<String, SecuritiesData>,
222}
223
224impl Store {
225 pub(crate) fn handle_push(&mut self, event: &mut PushEvent) {
226 let data = self.securities.entry(event.symbol.clone()).or_default();
227
228 match &mut event.detail {
229 PushEventDetail::Quote(quote) => merge_quote(data, quote),
230 PushEventDetail::Depth(depth) => merge_depth(data, depth),
231 PushEventDetail::Brokers(brokers) => merge_brokers(data, brokers),
232 PushEventDetail::Trade(trade) => merge_trades(data, trade),
233 PushEventDetail::Candlestick(_) => unreachable!(),
234 }
235 }
236}
237
238fn merge_quote(data: &mut SecuritiesData, quote: &mut PushQuote) {
239 let prev_quote = &data.quote;
240 let new_quote = PushQuote {
241 last_done: merge_decimal!(prev_quote, quote, last_done),
242 open: merge_decimal!(prev_quote, quote, open),
243 high: merge_decimal!(prev_quote, quote, high),
244 low: merge_decimal!(prev_quote, quote, low),
245 timestamp: quote.timestamp,
246 volume: merge_i64!(prev_quote, quote, volume),
247 turnover: merge_decimal!(prev_quote, quote, turnover),
248 trade_status: quote.trade_status,
249 trade_session: quote.trade_session,
250 current_volume: quote.current_volume,
251 current_turnover: quote.current_turnover,
252 };
253 data.quote = new_quote.clone();
254 *quote = new_quote;
255}
256
257fn merge_depth(data: &mut SecuritiesData, depth: &PushDepth) {
258 replace(&mut data.asks, depth.asks.clone(), |v| v.position);
259 replace(&mut data.bids, depth.bids.clone(), |v| v.position);
260}
261
262fn merge_brokers(data: &mut SecuritiesData, brokers: &PushBrokers) {
263 replace(&mut data.ask_brokers, brokers.ask_brokers.clone(), |v| {
264 v.position
265 });
266 replace(&mut data.bid_brokers, brokers.bid_brokers.clone(), |v| {
267 v.position
268 });
269}
270
271fn merge_trades(data: &mut SecuritiesData, trades: &PushTrades) {
272 data.trades.extend(trades.trades.clone());
273 if data.trades.len() > MAX_TRADES * 2 {
274 data.trades.drain(..MAX_TRADES);
275 }
276}
277
278fn replace<T, B, F>(elements: &mut Vec<T>, others: Vec<T>, f: F)
279where
280 F: Fn(&T) -> B,
281 B: Ord,
282{
283 for v in others {
284 match elements.binary_search_by_key(&f(&v), &f) {
285 Ok(index) => elements[index] = v,
286 Err(index) => elements.insert(index, v),
287 }
288 }
289}
290
291pub(crate) fn get_market(
292 market: Market,
293 board: SecurityBoard,
294) -> Option<&'static longport_candlesticks::Market> {
295 use longport_candlesticks::markets::*;
296
297 Some(match market {
298 Market::US if board == SecurityBoard::USOptionS => &US_OPTION,
299 Market::US => &US,
300 Market::HK => &HK,
301 Market::SG => &SG,
302 Market::CN => &CN,
303 Market::Unknown | Market::Crypto => return None,
304 })
305}
306
307fn convert_period(period: Period) -> longport_candlesticks::Period {
308 use longport_candlesticks::Period::*;
309
310 match period {
311 Period::UnknownPeriod => unreachable!(),
312 Period::OneMinute => Min_1,
313 Period::TwoMinute => Min_2,
314 Period::ThreeMinute => Min_3,
315 Period::FiveMinute => Min_5,
316 Period::TenMinute => Min_10,
317 Period::FifteenMinute => Min_15,
318 Period::TwentyMinute => Min_20,
319 Period::ThirtyMinute => Min_30,
320 Period::FortyFiveMinute => Min_45,
321 Period::SixtyMinute => Min_60,
322 Period::TwoHour => Min_120,
323 Period::ThreeHour => Min_180,
324 Period::FourHour => Min_240,
325 Period::Day => Day,
326 Period::Week => Week,
327 Period::Month => Month,
328 Period::Quarter => Quarter,
329 Period::Year => Year,
330 }
331}