1
0
mirror of https://github.com/openbsd/src.git synced 2025-01-10 06:47:55 -08:00

- introduce queue_init_submissions() which will sanitize the disk-based

queue at startup: catches left overs from interrupted sessions,
	reset F_MESSAGE_INPROCESS so that messages which were in MTA or
	MDA gets scheduled again.
- temporarily comment chl@'s O_EXLOCK -> fcntl change until we figure
	why it locks my mailbox under load
This commit is contained in:
gilles 2008-11-11 21:13:14 +00:00
parent e2ba6b9872
commit 851c912ec6

View File

@ -1,4 +1,4 @@
/* $OpenBSD: queue.c,v 1.9 2008/11/11 01:08:08 gilles Exp $ */
/* $OpenBSD: queue.c,v 1.10 2008/11/11 21:13:14 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@ -78,6 +78,7 @@ void queue_load_submissions(struct smtpd *, time_t);
int queue_message_schedule(struct message *, time_t);
int queue_message_from_id(char *, struct message *);
int queue_message_complete(struct message *);
int queue_init_submissions(void);
void
queue_sig_handler(int sig, short event, void *p)
@ -572,10 +573,10 @@ queue_timeout(int fd, short event, void *p)
batchp != NULL;
batchp = nxt) {
nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp);
if ((batchp->type & T_MTA_BATCH) &&
(batchp->flags & F_BATCH_RESOLVED) == 0)
(batchp->flags & F_BATCH_RESOLVED) == 0) {
continue;
}
batch_send(env, batchp, curtime);
@ -620,6 +621,7 @@ queue_load_submissions(struct smtpd *env, time_t tm)
}
continue;
}
message.lasttry = tm;
message.flags |= F_MESSAGE_PROCESSING;
queue_update_database(&message);
@ -636,7 +638,6 @@ queue_load_submissions(struct smtpd *env, time_t tm)
batchp = queue_record_batch(env, messagep);
if (messagep->batch_id == 0)
messagep->batch_id = batchp->id;
}
closedir(dirp);
@ -724,6 +725,11 @@ queue(struct smtpd *env)
fatal("queue: cannot drop privileges");
#endif
SPLAY_INIT(&env->batch_queue);
queue_init_submissions();
queue_load_submissions(env, time(NULL));
event_init();
signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
@ -735,8 +741,6 @@ queue(struct smtpd *env)
config_peers(env, peers, 5);
SPLAY_INIT(&env->batch_queue);
queue_setup_events(env);
event_dispatch();
queue_shutdown();
@ -809,7 +813,7 @@ queue_record_submission(struct message *message)
char *spool;
size_t spoolsz;
int fd;
int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC;
int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC|O_EXCL;
int spret;
FILE *fp;
int hm;
@ -859,8 +863,8 @@ queue_record_submission(struct message *message)
if (unlink(linkname) == -1)
fatal("queue_record_submission: unlink");
if (flock(fd, LOCK_EX) == -1)
fatal("queue_record_submission: flock");
// if (flock(fd, LOCK_EX) == -1)
// fatal("queue_record_submission: flock");
fp = fdopen(fd, "w");
if (fp == NULL)
@ -1136,9 +1140,6 @@ queue_update_database(struct message *message)
if ((fd = open(pathname, O_RDWR)) == -1)
fatal("queue_update_database: cannot open database");
if (flock(fd, LOCK_EX) == -1)
fatal("queue_update_database: cannot get a lock on database");
fp = fdopen(fd, "w");
if (fp == NULL)
fatal("fdopen");
@ -1163,7 +1164,7 @@ queue_record_daemon(struct message *message)
char message_uid[MAXPATHLEN];
size_t spoolsz;
int fd;
int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC;
int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC|O_EXLOCK;
int spret;
FILE *fp;
@ -1200,8 +1201,8 @@ queue_record_daemon(struct message *message)
if (unlink(linkname) == -1)
err(1, "unlink");
if (flock(fd, LOCK_EX) == -1)
err(1, "flock");
// if (flock(fd, LOCK_EX) == -1)
// err(1, "flock");
fp = fdopen(fd, "w");
if (fp == NULL)
@ -1276,6 +1277,53 @@ batch_cmp(struct batch *s1, struct batch *s2)
return (0);
}
int
queue_init_submissions(void)
{
DIR *dirp;
struct dirent *dp;
struct message message;
char pathname[MAXPATHLEN];
FILE *fp;
int spret;
dirp = opendir(PATH_ENVELOPES);
if (dirp == NULL)
err(1, "opendir");
while ((dp = readdir(dirp)) != NULL) {
if (dp->d_name[0] == '.')
continue;
spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
dp->d_name);
if (spret == -1 || spret >= MAXPATHLEN)
continue;
fp = fopen(pathname, "r");
if (fp == NULL)
continue;
if (fread(&message, 1, sizeof(struct message), fp) !=
sizeof(struct message)) {
fclose(fp);
continue;
}
fclose(fp);
if ((message.flags & F_MESSAGE_COMPLETE) == 0)
unlink(pathname);
else {
message.flags &= ~F_MESSAGE_PROCESSING;
queue_update_database(&message);
}
}
closedir(dirp);
return 1;
}
int
queue_message_complete(struct message *messagep)
{