You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

spool.rs 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. use std::collections::*;
  2. use std::io;
  3. use std::path::PathBuf;
  4. use std::sync::Arc;
  5. use std::time;
  6. use futures::compat::Stream01CompatExt;
  7. use futures::prelude::*;
  8. use inotify::ffi::*;
  9. use matrix_sdk::identifiers::RoomId;
  10. use tokio::fs as tokiofs;
  11. use tokio::io::{AsyncRead, AsyncReadExt};
  12. use tokio::sync::mpsc;
  13. use tokio_inotify;
  14. use crate::client::{self, MatrixClient};
  15. use crate::config::{self, *};
  16. #[derive(Debug)]
  17. pub enum Error {
  18. FileFormatMismatch,
  19. UnspecifiedClient(String),
  20. Io(io::Error),
  21. Matrix(matrix_sdk::Error),
  22. }
  23. impl From<io::Error> for Error {
  24. fn from(f: io::Error) -> Self {
  25. Self::Io(f)
  26. }
  27. }
  28. impl From<matrix_sdk::Error> for Error {
  29. fn from(f: matrix_sdk::Error) -> Self {
  30. Self::Matrix(f)
  31. }
  32. }
  33. #[derive(Clone)]
  34. struct SpoolAction {
  35. client: Arc<MatrixClient>,
  36. room: RoomId,
  37. delay_secs: u32,
  38. watch_path: PathBuf,
  39. }
  40. pub async fn start_spoolers(
  41. conf: Config,
  42. client_chans: HashMap<String, mpsc::Sender<client::Message>>,
  43. ) -> Result<(), Error> {
  44. for sd in conf.spool_dirs {
  45. let chan = client_chans
  46. .get(&sd.sender_acct_label)
  47. .ok_or_else(|| Error::UnspecifiedClient(sd.sender_acct_label.clone()))?;
  48. let ain = tokio_inotify::AsyncINotify::init().expect("spool: inotify init");
  49. let w = ain.add_watch(&sd.path, IN_CLOSE_WRITE | IN_MOVED_TO)?;
  50. println!("[spool] added watch: {:?}", sd.path);
  51. tokio::spawn(do_watch_dir(ain, sd, chan.clone()));
  52. }
  53. Ok(())
  54. }
  55. async fn do_watch_dir(
  56. inot: tokio_inotify::AsyncINotify,
  57. sdc: config::SpoolDir,
  58. mut dest: mpsc::Sender<client::Message>,
  59. ) {
  60. let mut iter = inot.compat();
  61. while let Some(ent) = iter.next().await {
  62. match ent {
  63. Ok(ent) => {
  64. // Just succ up the file and send it over. We'll do the
  65. // formatting later.
  66. let mut real_path = sdc.path.clone();
  67. real_path.push(ent.name);
  68. let s = match file_as_string(&real_path).await {
  69. Ok(s) => s,
  70. Err(e) => {
  71. eprintln!(
  72. "[spool] warning, could not read file, ignoring: {:?}",
  73. real_path
  74. );
  75. continue;
  76. }
  77. };
  78. let msg =
  79. client::Message::new_delay(sdc.dest_room_id.clone(), s, sdc.send_delay_sec);
  80. let tout = time::Duration::from_secs(30);
  81. dest.send_timeout(msg, tout)
  82. .map_err(|_| ())
  83. .await
  84. .expect("spool: relay channel send timeout");
  85. if let Err(e) = tokiofs::remove_file(&real_path).await {
  86. eprintln!(
  87. "[spool] warning: could not remove sent file, ignoring: {:?}",
  88. real_path
  89. );
  90. }
  91. }
  92. Err(e) => panic!("spool: error reading watch {:?}", e),
  93. }
  94. }
  95. }
  96. /*
  97. async fn process_file(p: &PathBuf, sa: &SpoolAction) -> Result<(), Error> {
  98. let ext = match p.extension().map(|e| e.to_str()).flatten() {
  99. Some(v) => v,
  100. None => {
  101. eprintln!("Found weird file {:?}, ignoring", p);
  102. return Ok(());
  103. }
  104. };
  105. let name = p
  106. .file_name()
  107. .map(|e| e.to_str())
  108. .flatten()
  109. .unwrap_or("[non-UTF-8]");
  110. // This makes me *mad*.
  111. let mut real_path = sa.watch_path.clone();
  112. real_path.push(p);
  113. match ext {
  114. "txt" => {
  115. println!("Processing file for {} at {:?}", sa.room, p);
  116. let buf = match file_as_string(&real_path).await {
  117. Ok(v) => v,
  118. Err(Error::FileFormatMismatch) => {
  119. println!("File {} is not UTF-8, ignoring", name);
  120. return Ok(());
  121. }
  122. Err(e) => return Err(e),
  123. };
  124. let mut rng = rand::thread_rng();
  125. let req = make_text_request(sa.room.clone(), buf.as_str(), &mut rng);
  126. match sa.client.as_ref().request(req).await {
  127. Ok(_) => {
  128. // Now delete it if it passed.
  129. }
  130. Err(e) => println!("Error processing {}: {:?}", name, e),
  131. }
  132. }
  133. _ => println!(
  134. "Found file {:?}, but it has unsupported extension \"{}\"",
  135. p, ext
  136. ),
  137. }
  138. Ok(())
  139. }*/
  140. async fn file_as_string(p: &PathBuf) -> Result<String, Error> {
  141. let mut buf = Vec::new();
  142. let mut f = tokiofs::File::open(p).await?;
  143. f.read_to_end(&mut buf).await?;
  144. String::from_utf8(buf).map_err(|_| Error::FileFormatMismatch)
  145. }