You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

spool.rs 6.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. use std::collections::*;
  2. use std::path::PathBuf;
  3. use std::sync::Arc;
  4. use futures::compat::Stream01CompatExt;
  5. use futures::prelude::*;
  6. use hyper::client::connect::dns::GaiResolver;
  7. use hyper::client::connect::HttpConnector;
  8. use hyper_tls::HttpsConnector;
  9. use inotify::ffi::*;
  10. use tokio::fs as tokiofs;
  11. use tokio::prelude::*;
  12. use tokio_inotify;
  13. use url::Url;
  14. use ruma_client::Client;
  15. use ruma_client_api::r0::message as rumamessage;
  16. use ruma_events::{self, room::message::*};
  17. use ruma_identifiers::RoomId;
  18. use crate::config::*;
  19. type MatrixClient = Client<HttpsConnector<HttpConnector<GaiResolver>>>;
  20. type MessageRequest = rumamessage::create_message_event::Request;
  21. #[derive(Debug)]
  22. pub enum Error {
  23. FileFormatMismatch,
  24. BadUrl,
  25. Io(io::Error),
  26. MtxClient(ruma_client::Error),
  27. }
  28. impl From<io::Error> for Error {
  29. fn from(f: io::Error) -> Self {
  30. Self::Io(f)
  31. }
  32. }
  33. impl From<ruma_client::Error> for Error {
  34. fn from(f: ruma_client::Error) -> Self {
  35. Self::MtxClient(f)
  36. }
  37. }
  38. #[derive(Clone)]
  39. struct SpoolAction {
  40. client: Arc<MatrixClient>,
  41. room: RoomId,
  42. delay_secs: u32,
  43. watch_path: PathBuf,
  44. }
  45. pub async fn start_spooling(conf: Config) -> Result<(), Error> {
  46. let ain = tokio_inotify::AsyncINotify::init().expect("inotify init");
  47. let mut clients = HashMap::new();
  48. let mut watch_map = HashMap::new();
  49. for s in conf.spool_dirs {
  50. let label = s.sender_acct_label;
  51. let w = ain.add_watch(&s.path, IN_CLOSE_WRITE | IN_MOVED_TO)?;
  52. println!("Added watch: {}", s.path.to_str().unwrap_or("[non-UTF-8]"));
  53. // TODO Make this better.
  54. let cli = if !clients.contains_key(&label) {
  55. let acc = conf.accounts.get(&label).expect("missing account");
  56. let cli = create_and_auth_client(acc.clone()).await?;
  57. clients.insert(label, cli.clone());
  58. cli
  59. } else {
  60. clients.get(&label).expect("missing account").clone()
  61. };
  62. let sa = SpoolAction {
  63. client: cli,
  64. room: s.dest_room_id,
  65. delay_secs: s.send_delay_sec,
  66. watch_path: s.path.clone(),
  67. };
  68. watch_map.insert(w, sa);
  69. }
  70. let watch_map = Arc::new(watch_map);
  71. // This is horrible.
  72. let _ = ain
  73. .compat()
  74. .map_err(Error::from)
  75. .try_for_each({
  76. |e| {
  77. // Not sure why we have to do this clone here outside.
  78. let wm = watch_map.clone();
  79. async move {
  80. // I don't like these clones but idk any better way.
  81. let act = match wm.as_ref().get(&e.wd) {
  82. Some(a) => a.clone(),
  83. None => {
  84. println!(
  85. "got a wd that was not from a watch we added, ignoring: {:?}",
  86. e.wd
  87. );
  88. return Ok(());
  89. }
  90. };
  91. //let pb = e.name.clone();
  92. // TODO This should be spawning a new task.
  93. //tokio::spawn(async move {
  94. // TODO Respect delay.
  95. match process_file(&e.name, &act).await {
  96. Ok(()) => {} // ok I guess?
  97. Err(e) => println!("Error processing file: {:?}", e),
  98. }
  99. //});
  100. Ok(())
  101. }
  102. }
  103. })
  104. .await?;
  105. Ok(())
  106. }
  107. async fn process_file(p: &PathBuf, sa: &SpoolAction) -> Result<(), Error> {
  108. let ext = match p.extension().map(|e| e.to_str()).flatten() {
  109. Some(v) => v,
  110. None => {
  111. println!("Found weird file {:?}, ignoring", p);
  112. return Ok(());
  113. }
  114. };
  115. let name = p
  116. .file_name()
  117. .map(|e| e.to_str())
  118. .flatten()
  119. .unwrap_or("[non-UTF-8]");
  120. // This makes me *mad*.
  121. let mut real_path = sa.watch_path.clone();
  122. real_path.push(p);
  123. match ext {
  124. "txt" => {
  125. println!("Processing file for {} at {:?}", sa.room, p);
  126. let buf = match file_as_string(&real_path).await {
  127. Ok(v) => v,
  128. Err(Error::FileFormatMismatch) => {
  129. println!("File {} is not UTF-8, ignoring", name);
  130. return Ok(());
  131. }
  132. Err(e) => return Err(e),
  133. };
  134. let mut rng = rand::thread_rng();
  135. let req = make_text_request(sa.room.clone(), buf.as_str(), &mut rng);
  136. match sa.client.as_ref().request(req).await {
  137. Ok(_) => {
  138. // Now delete it if it passed.
  139. tokiofs::remove_file(real_path).await?;
  140. }
  141. Err(e) => println!("Error processing {}: {:?}", name, e),
  142. }
  143. }
  144. _ => println!(
  145. "Found file {:?}, but it has unsupported extension \"{}\"",
  146. p, ext
  147. ),
  148. }
  149. Ok(())
  150. }
  151. async fn file_as_string(p: &PathBuf) -> Result<String, Error> {
  152. let mut buf = Vec::new();
  153. let mut f = tokiofs::File::open(p).await?;
  154. f.read_to_end(&mut buf).await?;
  155. String::from_utf8(buf).map_err(|_| Error::FileFormatMismatch)
  156. }
  157. async fn create_and_auth_client(acct: Account) -> Result<Arc<MatrixClient>, Error> {
  158. let hs_url = Url::parse(&acct.homeserver).map_err(|_| Error::BadUrl)?;
  159. let c = MatrixClient::https(hs_url, None);
  160. match acct.auth {
  161. Auth::UsernamePass(un, pw) => c.log_in(un, pw, acct.device_id, acct.display).await?,
  162. };
  163. Ok(Arc::new(c))
  164. }
  165. fn make_text_request<R: rand::Rng>(room_id: RoomId, msg: &str, rng: &mut R) -> MessageRequest {
  166. let inner = TextMessageEventContent {
  167. body: String::from(msg),
  168. format: None,
  169. formatted_body: None,
  170. relates_to: None,
  171. };
  172. let mec = MessageEventContent::Text(inner);
  173. MessageRequest {
  174. room_id: room_id,
  175. event_type: ruma_events::EventType::RoomMessage,
  176. txn_id: make_txn_id(rng),
  177. data: mec,
  178. }
  179. }
  180. const TXN_ID_LEN: usize = 20;
  181. fn make_txn_id<R: rand::Rng>(rng: &mut R) -> String {
  182. let mut buf = String::with_capacity(TXN_ID_LEN);
  183. for _ in 0..TXN_ID_LEN {
  184. buf.push((rng.gen_range(0, 26) + ('a' as u8)) as char);
  185. }
  186. buf
  187. }