longportwhale/trade/
context.rs1use std::sync::Arc;
2
3use longportwhale_wscli::WsClientError;
4use tokio::sync::{mpsc, oneshot};
5
6use crate::{
7 trade::{
8 core::{Command, Core},
9 PushEvent, TopicType,
10 },
11 Config, Result,
12};
13
14#[derive(Clone)]
16pub struct TradeContext {
17 command_tx: mpsc::UnboundedSender<Command>,
18}
19
20impl TradeContext {
21 pub async fn try_new(
23 config: Arc<Config>,
24 ) -> Result<(Self, mpsc::UnboundedReceiver<PushEvent>)> {
25 let (command_tx, command_rx) = mpsc::unbounded_channel();
26 let (push_tx, push_rx) = mpsc::unbounded_channel();
27 tokio::spawn(Core::try_new(config, command_rx, push_tx).await?.run());
28 Ok((TradeContext { command_tx }, push_rx))
29 }
30
31 pub async fn subscribe<I>(&self, topics: I) -> Result<()>
35 where
36 I: IntoIterator<Item = TopicType>,
37 {
38 let (reply_tx, reply_rx) = oneshot::channel();
39 self.command_tx
40 .send(Command::Subscribe {
41 topics: topics.into_iter().collect(),
42 reply_tx,
43 })
44 .map_err(|_| WsClientError::ClientClosed)?;
45 reply_rx.await.map_err(|_| WsClientError::ClientClosed)?
46 }
47
48 pub async fn unsubscribe<I>(&self, topics: I) -> Result<()>
52 where
53 I: IntoIterator<Item = TopicType>,
54 {
55 let (reply_tx, reply_rx) = oneshot::channel();
56 self.command_tx
57 .send(Command::Unsubscribe {
58 topics: topics.into_iter().collect(),
59 reply_tx,
60 })
61 .map_err(|_| WsClientError::ClientClosed)?;
62 reply_rx.await.map_err(|_| WsClientError::ClientClosed)?
63 }
64}