Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

spool.rs 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. use core::time;
  2. use std::collections::*;
  3. use std::path::PathBuf;
  4. use std::sync::Arc;
  5. use futures::compat::Stream01CompatExt;
  6. use futures::prelude::*;
  7. use inotify::ffi::*;
  8. use matrix_sdk::identifiers::RoomId;
  9. use tokio::fs as tokiofs;
  10. use tokio::prelude::*;
  11. use tokio::sync::mpsc;
  12. use tokio_inotify;
  13. use crate::client::{self, MatrixClient};
  14. use crate::config::{self, *};
  15. #[derive(Debug)]
  16. pub enum Error {
  17. FileFormatMismatch,
  18. UnspecifiedClient(String),
  19. Io(io::Error),
  20. Matrix(matrix_sdk::Error),
  21. }
  22. impl From<io::Error> for Error {
  23. fn from(f: io::Error) -> Self {
  24. Self::Io(f)
  25. }
  26. }
  27. impl From<matrix_sdk::Error> for Error {
  28. fn from(f: matrix_sdk::Error) -> Self {
  29. Self::Matrix(f)
  30. }
  31. }
  32. #[derive(Clone)]
  33. struct SpoolAction {
  34. client: Arc<MatrixClient>,
  35. room: RoomId,
  36. delay_secs: u32,
  37. watch_path: PathBuf,
  38. }
  39. pub async fn start_spoolers(
  40. conf: Config,
  41. client_chans: HashMap<String, mpsc::Sender<client::Message>>,
  42. ) -> Result<(), Error> {
  43. for sd in conf.spool_dirs {
  44. let chan = client_chans
  45. .get(&sd.sender_acct_label)
  46. .ok_or_else(|| Error::UnspecifiedClient(sd.sender_acct_label.clone()))?;
  47. let ain = tokio_inotify::AsyncINotify::init().expect("spool: inotify init");
  48. let w = ain.add_watch(&sd.path, IN_CLOSE_WRITE | IN_MOVED_TO)?;
  49. println!("[spool] added watch: {:?}", sd.path);
  50. tokio::spawn(do_watch_dir(ain, sd, chan.clone()));
  51. }
  52. Ok(())
  53. }
  54. async fn do_watch_dir(
  55. inot: tokio_inotify::AsyncINotify,
  56. sdc: config::SpoolDir,
  57. mut dest: mpsc::Sender<client::Message>,
  58. ) {
  59. let mut iter = inot.compat();
  60. while let Some(ent) = iter.next().await {
  61. match ent {
  62. Ok(ent) => {
  63. // Just succ up the file and send it over. We'll do the
  64. // formatting later.
  65. let mut real_path = sdc.path.clone();
  66. real_path.push(ent.name);
  67. let s = match file_as_string(&real_path).await {
  68. Ok(s) => s,
  69. Err(e) => {
  70. eprintln!(
  71. "[spool] warning, could not read file, ignoring: {:?}",
  72. real_path
  73. );
  74. continue;
  75. }
  76. };
  77. let msg =
  78. client::Message::new_delay(sdc.dest_room_id.clone(), s, sdc.send_delay_sec);
  79. let tout = time::Duration::from_secs(30);
  80. dest.send_timeout(msg, tout)
  81. .map_err(|_| ())
  82. .await
  83. .expect("spool: relay channel send timeout");
  84. if let Err(e) = tokiofs::remove_file(&real_path).await {
  85. eprintln!(
  86. "[spool] warning: could not remove sent file, ignoring: {:?}",
  87. real_path
  88. );
  89. }
  90. }
  91. Err(e) => panic!("spool: error reading watch {:?}", e),
  92. }
  93. }
  94. }
  95. /*
  96. async fn process_file(p: &PathBuf, sa: &SpoolAction) -> Result<(), Error> {
  97. let ext = match p.extension().map(|e| e.to_str()).flatten() {
  98. Some(v) => v,
  99. None => {
  100. eprintln!("Found weird file {:?}, ignoring", p);
  101. return Ok(());
  102. }
  103. };
  104. let name = p
  105. .file_name()
  106. .map(|e| e.to_str())
  107. .flatten()
  108. .unwrap_or("[non-UTF-8]");
  109. // This makes me *mad*.
  110. let mut real_path = sa.watch_path.clone();
  111. real_path.push(p);
  112. match ext {
  113. "txt" => {
  114. println!("Processing file for {} at {:?}", sa.room, p);
  115. let buf = match file_as_string(&real_path).await {
  116. Ok(v) => v,
  117. Err(Error::FileFormatMismatch) => {
  118. println!("File {} is not UTF-8, ignoring", name);
  119. return Ok(());
  120. }
  121. Err(e) => return Err(e),
  122. };
  123. let mut rng = rand::thread_rng();
  124. let req = make_text_request(sa.room.clone(), buf.as_str(), &mut rng);
  125. match sa.client.as_ref().request(req).await {
  126. Ok(_) => {
  127. // Now delete it if it passed.
  128. }
  129. Err(e) => println!("Error processing {}: {:?}", name, e),
  130. }
  131. }
  132. _ => println!(
  133. "Found file {:?}, but it has unsupported extension \"{}\"",
  134. p, ext
  135. ),
  136. }
  137. Ok(())
  138. }*/
  139. async fn file_as_string(p: &PathBuf) -> Result<String, Error> {
  140. let mut buf = Vec::new();
  141. let mut f = tokiofs::File::open(p).await?;
  142. f.read_to_end(&mut buf).await?;
  143. String::from_utf8(buf).map_err(|_| Error::FileFormatMismatch)
  144. }