Browse Source

Rewrote most of the spooling code and split out the client message handling so we use a bunch of different tasks now that communicate by channels.

master
treyzania 2 years ago
parent
commit
f5276b01a7
3 changed files with 204 additions and 120 deletions
  1. 31
    15
      src/main.rs
  2. 108
    0
      src/sender.rs
  3. 65
    105
      src/spool.rs

+ 31
- 15
src/main.rs View File

@@ -3,14 +3,17 @@
#![feature(impl_trait_in_bindings)]
#![feature(async_closure)]

mod config;
mod spool;

use std::collections::*;
use std::path::PathBuf;

use clap::Clap;

use tokio::{self, runtime};
use tokio::runtime;
use tokio::sync::mpsc;

mod config;
mod sender;
mod spool;

use crate::config::*;

@@ -42,7 +45,7 @@ struct Opts {
fn main() {
let opts = Opts::parse();
if opts.reload_trigger.is_some() {
println!("Reload trigger file specified, but this option is not supported yet. Ignoring.");
eprintln!("[init] warning: reload trigger file specified, but this option is not supported yet, ignoring...");
}

let mut rt = make_runtime();
@@ -56,7 +59,7 @@ fn main() {
match rt.block_on(find_configs(&dir)) {
Ok(paths) => confs.extend(paths),
Err(e) => {
println!("Error reading configuration: {:?}", e);
eprintln!("[init] error reading configuration: {:?}", e);
return;
}
}
@@ -64,33 +67,46 @@ fn main() {

// Sanity check.
if confs.len() == 0 {
println!("No configuration declared, exiting...");
println!("[init] no configuration declared, exiting...");
return;
}

// Process configuration.
// TODO Remove all these cases of `block_on` except for a final toplevel task.
let config = match rt.block_on(parse_configs(&confs)) {
Ok(c) => c,
Err(e) => {
println!("Error parsing configuration: {:?}", e);
eprintln!("[init] error parsing configuration: {:?}", e);
return; // maybe
}
};

for s in &config.spool_dirs {
if s.send_delay_sec != 0 {
println!(
"Warning: send delay (as in watch on {:?}) are current not supported, ignoring",
s.path
);
let mut clients = HashMap::new();
let mut sender_futs = Vec::new();

// Init each account and put outgoing channels into a table for later reference.
for cc in config.accounts.iter() {
match rt.block_on(sender::create_and_auth_client(cc.1.clone())) {
Ok(c) => {
let (send, recv) = mpsc::channel(2); // FIXME configurable
clients.insert(cc.0.clone(), send);
sender_futs.push(sender::submit_messages(c, recv));
}
Err(e) => {
eprintln!("[init] error: client setup failed: {:?}", e);
return; // maybe
}
}
}

// This is where the real stuff actually happens.
match rt.block_on(spool::start_spooling(config)) {
match rt.block_on(spool::start_spoolers(config, clients)) {
Ok(_) => {}
Err(e) => println!("fatal error: {:?}", e),
}

// just wait I guess?
rt.block_on(futures::future::join_all(sender_futs.into_iter()));
}

fn make_runtime() -> runtime::Runtime {

+ 108
- 0
src/sender.rs View File

@@ -0,0 +1,108 @@
use std::collections::*;
use std::path::PathBuf;
use std::sync::Arc;

use futures::compat::Stream01CompatExt;
use futures::prelude::*;

use hyper::client::connect::dns::GaiResolver;
use hyper::client::connect::HttpConnector;
use hyper_tls::HttpsConnector;

use inotify::ffi::*;

use tokio::fs as tokiofs;
use tokio::prelude::*;
use tokio::sync::mpsc;

use tokio_inotify;

use url::Url;

use ruma_client::Client;
use ruma_client_api::r0::message as rumamessage;
use ruma_events::{self, room::message::*};
use ruma_identifiers::RoomId;

use crate::config::*;

#[derive(Debug)]
pub enum Error {
BadUrl,
MtxClient(ruma_client::Error),
}

impl From<ruma_client::Error> for Error {
fn from(f: ruma_client::Error) -> Self {
Self::MtxClient(f)
}
}

type MatrixClient = Client<HttpsConnector<HttpConnector<GaiResolver>>>;
type MessageRequest = rumamessage::create_message_event::Request;

pub struct Message {
dest_room: RoomId,
msg: String,
delay_secs: u32,
}

impl Message {
pub fn new(dest_room: RoomId, msg: String) -> Self {
Self::new_delay(dest_room, msg, 0)
}

pub fn new_delay(dest_room: RoomId, msg: String, delay_secs: u32) -> Self {
Self {
dest_room,
msg,
delay_secs,
}
}
}

pub async fn create_and_auth_client(acct: Account) -> Result<Arc<MatrixClient>, Error> {
let hs_url = Url::parse(&acct.homeserver).map_err(|_| Error::BadUrl)?;
let c = MatrixClient::https(hs_url, None);
match acct.auth {
Auth::UsernamePass(un, pw) => c.log_in(un, pw, acct.device_id, acct.display).await?,
};
Ok(Arc::new(c))
}

pub async fn submit_messages(cli: Arc<MatrixClient>, mut recv: mpsc::Receiver<Message>) {
let mut rng = rand::thread_rng();

while let Some(msg) = recv.recv().await {
let req = make_text_request(msg.dest_room.clone(), &msg.msg, &mut rng);
if let Err(e) = cli.request(req).await {
panic!("[client] error sending request: {:?}", e);
}
}
}

fn make_text_request<R: rand::Rng>(room_id: RoomId, msg: &str, rng: &mut R) -> MessageRequest {
let inner = TextMessageEventContent {
body: String::from(msg),
format: None,
formatted_body: None,
relates_to: None,
};
let mec = MessageEventContent::Text(inner);
MessageRequest {
room_id: room_id,
event_type: ruma_events::EventType::RoomMessage,
txn_id: make_txn_id(rng),
data: mec,
}
}

const TXN_ID_LEN: usize = 20;

fn make_txn_id<R: rand::Rng>(rng: &mut R) -> String {
let mut buf = String::with_capacity(TXN_ID_LEN);
for _ in 0..TXN_ID_LEN {
buf.push((rng.gen_range(0, 26) + ('a' as u8)) as char);
}
buf
}

+ 65
- 105
src/spool.rs View File

@@ -1,3 +1,4 @@
use core::time;
use std::collections::*;
use std::path::PathBuf;
use std::sync::Arc;
@@ -13,6 +14,7 @@ use inotify::ffi::*;

use tokio::fs as tokiofs;
use tokio::prelude::*;
use tokio::sync::mpsc;

use tokio_inotify;

@@ -23,7 +25,8 @@ use ruma_client_api::r0::message as rumamessage;
use ruma_events::{self, room::message::*};
use ruma_identifiers::RoomId;

use crate::config::*;
use crate::config::{self, *};
use crate::sender;

type MatrixClient = Client<HttpsConnector<HttpConnector<GaiResolver>>>;
type MessageRequest = rumamessage::create_message_event::Request;
@@ -32,6 +35,7 @@ type MessageRequest = rumamessage::create_message_event::Request;
pub enum Error {
FileFormatMismatch,
BadUrl,
UnspecifiedClient(String),
Io(io::Error),
MtxClient(ruma_client::Error),
}
@@ -56,85 +60,77 @@ struct SpoolAction {
watch_path: PathBuf,
}

pub async fn start_spooling(conf: Config) -> Result<(), Error> {
let ain = tokio_inotify::AsyncINotify::init().expect("inotify init");

let mut clients = HashMap::new();
let mut watch_map = HashMap::new();

for s in conf.spool_dirs {
let label = s.sender_acct_label;
let w = ain.add_watch(&s.path, IN_CLOSE_WRITE | IN_MOVED_TO)?;
println!("Added watch: {}", s.path.to_str().unwrap_or("[non-UTF-8]"));

// TODO Make this better.
let cli = if !clients.contains_key(&label) {
let acc = conf.accounts.get(&label).expect("missing account");
let cli = create_and_auth_client(acc.clone()).await?;
clients.insert(label, cli.clone());
cli
} else {
clients.get(&label).expect("missing account").clone()
};

let sa = SpoolAction {
client: cli,
room: s.dest_room_id,
delay_secs: s.send_delay_sec,
watch_path: s.path.clone(),
};

watch_map.insert(w, sa);
pub async fn start_spoolers(
conf: Config,
client_chans: HashMap<String, mpsc::Sender<sender::Message>>,
) -> Result<(), Error> {
for sd in conf.spool_dirs {
let chan = client_chans
.get(&sd.sender_acct_label)
.ok_or_else(|| Error::UnspecifiedClient(sd.sender_acct_label.clone()))?;

let ain = tokio_inotify::AsyncINotify::init().expect("spool: inotify init");

let w = ain.add_watch(&sd.path, IN_CLOSE_WRITE | IN_MOVED_TO)?;
println!("[spool] added watch: {:?}", sd.path);

tokio::spawn(do_watch_dir(ain, sd, chan.clone()));
}

let watch_map = Arc::new(watch_map);

// This is horrible.
let _ = ain
.compat()
.map_err(Error::from)
.try_for_each({
|e| {
// Not sure why we have to do this clone here outside.
let wm = watch_map.clone();
async move {
// I don't like these clones but idk any better way.
let act = match wm.as_ref().get(&e.wd) {
Some(a) => a.clone(),
None => {
println!(
"got a wd that was not from a watch we added, ignoring: {:?}",
e.wd
);
return Ok(());
}
};

//let pb = e.name.clone();

// TODO This should be spawning a new task.
//tokio::spawn(async move {
// TODO Respect delay.
match process_file(&e.name, &act).await {
Ok(()) => {} // ok I guess?
Err(e) => println!("Error processing file: {:?}", e),
}
//});
Ok(())
}

Ok(())
async fn do_watch_dir(
inot: tokio_inotify::AsyncINotify,
sdc: config::SpoolDir,
mut dest: mpsc::Sender<sender::Message>,
) {
let mut iter = inot.compat();
while let Some(ent) = iter.next().await {
match ent {
Ok(ent) => {
// Just succ up the file and send it over. We'll do the
// formatting later.
let mut real_path = sdc.path.clone();
real_path.push(ent.name);
let s = match file_as_string(&real_path).await {
Ok(s) => s,
Err(e) => {
eprintln!(
"[spool] warning, could not read file, ignoring: {:?}",
real_path
);
continue;
}
};

let msg =
sender::Message::new_delay(sdc.dest_room_id.clone(), s, sdc.send_delay_sec);
let tout = time::Duration::from_secs(30);
dest.send_timeout(msg, tout)
.map_err(|_| ())
.await
.expect("spool: relay channel send timeout");

if let Err(e) = tokiofs::remove_file(&real_path).await {
eprintln!(
"[spool] warning: could not remove sent file, ignoring: {:?}",
real_path
);
}
}
})
.await?;

Ok(())
Err(e) => panic!("spool: error reading watch {:?}", e),
}
}
}

/*
async fn process_file(p: &PathBuf, sa: &SpoolAction) -> Result<(), Error> {
let ext = match p.extension().map(|e| e.to_str()).flatten() {
Some(v) => v,
None => {
println!("Found weird file {:?}, ignoring", p);
eprintln!("Found weird file {:?}, ignoring", p);
return Ok(());
}
};
@@ -167,7 +163,6 @@ async fn process_file(p: &PathBuf, sa: &SpoolAction) -> Result<(), Error> {
match sa.client.as_ref().request(req).await {
Ok(_) => {
// Now delete it if it passed.
tokiofs::remove_file(real_path).await?;
}
Err(e) => println!("Error processing {}: {:?}", name, e),
}
@@ -179,7 +174,7 @@ async fn process_file(p: &PathBuf, sa: &SpoolAction) -> Result<(), Error> {
}

Ok(())
}
}*/

async fn file_as_string(p: &PathBuf) -> Result<String, Error> {
let mut buf = Vec::new();
@@ -187,38 +182,3 @@ async fn file_as_string(p: &PathBuf) -> Result<String, Error> {
f.read_to_end(&mut buf).await?;
String::from_utf8(buf).map_err(|_| Error::FileFormatMismatch)
}

async fn create_and_auth_client(acct: Account) -> Result<Arc<MatrixClient>, Error> {
let hs_url = Url::parse(&acct.homeserver).map_err(|_| Error::BadUrl)?;
let c = MatrixClient::https(hs_url, None);
match acct.auth {
Auth::UsernamePass(un, pw) => c.log_in(un, pw, acct.device_id, acct.display).await?,
};
Ok(Arc::new(c))
}

fn make_text_request<R: rand::Rng>(room_id: RoomId, msg: &str, rng: &mut R) -> MessageRequest {
let inner = TextMessageEventContent {
body: String::from(msg),
format: None,
formatted_body: None,
relates_to: None,
};
let mec = MessageEventContent::Text(inner);
MessageRequest {
room_id: room_id,
event_type: ruma_events::EventType::RoomMessage,
txn_id: make_txn_id(rng),
data: mec,
}
}

const TXN_ID_LEN: usize = 20;

fn make_txn_id<R: rand::Rng>(rng: &mut R) -> String {
let mut buf = String::with_capacity(TXN_ID_LEN);
for _ in 0..TXN_ID_LEN {
buf.push((rng.gen_range(0, 26) + ('a' as u8)) as char);
}
buf
}

Loading…
Cancel
Save