migrate multiple files from a folder

This commit is contained in:
boring_nick 2023-10-28 18:41:01 +03:00
parent 6e08dc2cf9
commit fc50c042e5
4 changed files with 67 additions and 59 deletions

View File

@ -24,7 +24,7 @@ pub enum Command {
},
MigrateSupibot {
#[clap(short, long)]
file: PathBuf,
logs_dir: PathBuf,
#[clap(short, long)]
users_file: PathBuf,
},

View File

@ -76,9 +76,10 @@ async fn main() -> anyhow::Result<()> {
channel_id,
jobs,
}) => migrate(db, source_dir, channel_id, jobs).await,
Some(Command::MigrateSupibot { file, users_file }) => {
migrator::supibot::run(config, db, &file, &users_file).await
}
Some(Command::MigrateSupibot {
logs_dir,
users_file,
}) => migrator::supibot::run(config, db, &logs_dir, &users_file).await,
}
}

View File

@ -28,7 +28,7 @@ const USERS_REQUEST_CHUNK_SIZE: usize = 50;
pub async fn run(
config: Config,
db: clickhouse::Client,
file_path: &Path,
logs_path: &Path,
users_file_path: &Path,
) -> anyhow::Result<()> {
let mut users_client = UsersClient::default();
@ -36,53 +36,66 @@ pub async fn run(
.add_from_file(users_file_path)
.context("Could not read the users file")?;
info!("Loading file {file_path:?}");
let file_name = file_path
.file_name()
.context("Provided file does not have a name")?;
let channel_name = file_name
.to_str()
.context("Invalid UTF-8 in file name")?
.split('.')
.next()
.context("Could not extract file name")?;
let read_dir = std::fs::read_dir(logs_path)?;
let channel_id = users_client
.get_user_id_by_name(channel_name)
.await?
.context("Could not get channel id")?;
for entry in read_dir {
let entry = entry?;
let file_name = entry.file_name();
let file_name = file_name.to_str().expect("File name is invalid UTF-8");
let inserter = db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(30)),
Some(Duration::from_secs(180)),
)
.with_max_entries(INSERT_BATCH_SIZE)
.with_period(Some(Duration::from_secs(15)));
if let Some(channel_name) = file_name.strip_suffix(".csv") {
let channel_id = users_client
.get_user_id_by_name(channel_name)
.await?
.context("Could not get channel id")?;
info!("Migrating file {file_name}, channel id {channel_id}");
let existing_dates = get_existing_dates(&db, &channel_id).await?;
let inserter = db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(30)),
Some(Duration::from_secs(180)),
)
.with_max_entries(INSERT_BATCH_SIZE)
.with_period(Some(Duration::from_secs(15)));
let migrator = SupibotMigrator {
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
inserter,
channel_id: channel_id.to_owned(),
channel_login: channel_name.to_owned(),
invalid_user_ids: HashSet::new(),
existing_dates,
imported_count: 0,
skipped_count: 0,
};
let existing_dates = get_existing_dates(&db, &channel_id).await?;
info!("Adding channel {channel_id} to config");
config
.channels
.write()
.unwrap()
.insert(channel_id.to_owned());
config.save()?;
let migrator = SupibotMigrator {
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
inserter,
channel_id: channel_id.to_owned(),
channel_login: channel_name.to_owned(),
invalid_user_ids: HashSet::new(),
existing_dates,
imported_count: 0,
skipped_count: 0,
};
migrator.migrate_channel(file_path, users_client).await
info!("Adding channel {channel_id} to config");
config
.channels
.write()
.unwrap()
.insert(channel_id.to_owned());
config.save()?;
match migrator
.migrate_channel(&entry.path(), &mut users_client)
.await
{
Ok((imported_count, skipped_count)) => {
info!("Channel {channel_name} successfully migrated:");
info!("Imported {imported_count} messages");
info!("{skipped_count} messages were skipped due to duplicate dates",);
}
Err(err) => {
error!("Could not migrate channel {channel_name}: {err:#}");
}
}
}
}
Ok(())
}
async fn get_existing_dates(
@ -136,8 +149,8 @@ impl SupibotMigrator {
async fn migrate_channel(
mut self,
file_path: &Path,
mut users_client: UsersClient,
) -> anyhow::Result<()> {
users_client: &mut UsersClient,
) -> anyhow::Result<(u64, u64)> {
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let rdr = csv::Reader::from_reader(reader);
@ -161,7 +174,7 @@ impl SupibotMigrator {
.push(supibot_message);
if self.non_cached_messages.len() >= USERS_REQUEST_CHUNK_SIZE {
self.flush_non_cached(&mut users_client).await?;
self.flush_non_cached(users_client).await?;
}
}
} else {
@ -187,7 +200,7 @@ impl SupibotMigrator {
);
}
}
self.flush_non_cached(&mut users_client).await?;
self.flush_non_cached(users_client).await?;
let stats = self
.inserter
@ -201,17 +214,11 @@ impl SupibotMigrator {
);
}
info!("Imported {} messages", self.imported_count);
info!(
"{} messages were skipped due to duplicate dates",
self.skipped_count
);
if !self.invalid_user_ids.is_empty() {
error!("Invalid user ids: {:?}", self.invalid_user_ids);
}
Ok(())
Ok((self.imported_count, self.skipped_count))
}
async fn flush_non_cached(&mut self, users_client: &mut UsersClient) -> anyhow::Result<()> {

View File

@ -96,7 +96,7 @@ impl UsersClient {
pub async fn get_user_id_by_name(&mut self, name: &str) -> anyhow::Result<Option<String>> {
match self.names.get(name) {
Some(id) => Ok(id.as_ref().map(|id| self.users.get(id).cloned().unwrap())),
Some(id) => Ok(id.clone()),
None => {
debug!("Fetching info for name {name}");
let response = self
@ -123,7 +123,7 @@ impl UsersClient {
Some(user) => {
self.names.insert(user.login.clone(), Some(user.id.clone()));
self.users.insert(user.id.clone(), user.login.clone());
Ok(Some(user.login))
Ok(Some(user.id))
}
None => {
warn!("User {name} cannot be retrieved");