1
0
mirror of https://github.com/openbsd/src.git synced 2025-01-10 06:47:55 -08:00

- queue process no longer schedules messages which do not have flag

F_MESSAGE_COMPLETE
- submit recipients to the queue as we read them from RCPT instead of
	submiting them all at once when DATA is over. this prevents us
	from having to keep a potentially large number of recipients in
	memory during the whole session.
- remove all code that dealt with the recipients queue of a message as
	it is no longer used.
- several small changes to make sure the server is always in a recoverable
	state in case of an unexpected shutdown.
This commit is contained in:
gilles 2008-11-11 01:08:08 +00:00
parent 4277a387fe
commit 93e859c46a
4 changed files with 97 additions and 57 deletions

View File

@ -1,4 +1,4 @@
/* $OpenBSD: queue.c,v 1.8 2008/11/11 01:01:39 chl Exp $ */
/* $OpenBSD: queue.c,v 1.9 2008/11/11 01:08:08 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@ -77,6 +77,7 @@ void queue_register_daemon_message(struct smtpd *, struct batch *, struct messa
void queue_load_submissions(struct smtpd *, time_t);
int queue_message_schedule(struct message *, time_t);
int queue_message_from_id(char *, struct message *);
int queue_message_complete(struct message *);
void
queue_sig_handler(int sig, short event, void *p)
@ -237,6 +238,20 @@ queue_dispatch_smtp(int sig, short event, void *p)
break;
}
case IMSG_QUEUE_MESSAGE_COMPLETE: {
struct message *messagep;
struct submit_status ss;
messagep = imsg.data;
ss.id = messagep->session_id;
queue_message_complete(messagep);
imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1,
&ss, sizeof(ss));
break;
}
default:
log_debug("queue_dispatch_smtp: unexpected imsg %d",
imsg.hdr.type);
@ -1261,6 +1276,53 @@ batch_cmp(struct batch *s1, struct batch *s2)
return (0);
}
int
queue_message_complete(struct message *messagep)
{
DIR *dirp;
struct dirent *dp;
struct message message;
char pathname[MAXPATHLEN];
FILE *fp;
int spret;
dirp = opendir(PATH_ENVELOPES);
if (dirp == NULL)
err(1, "opendir");
while ((dp = readdir(dirp)) != NULL) {
if (dp->d_name[0] == '.')
continue;
if (strncmp(messagep->message_id,
dp->d_name, strlen(messagep->message_id)) != 0)
continue;
spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
dp->d_name);
if (spret == -1 || spret >= MAXPATHLEN)
continue;
fp = fopen(pathname, "r");
if (fp == NULL)
continue;
if (fread(&message, 1, sizeof(struct message), fp) !=
sizeof(struct message)) {
fclose(fp);
continue;
}
fclose(fp);
message.flags |= F_MESSAGE_COMPLETE;
queue_update_database(&message);
}
closedir(dirp);
return 1;
}
int
queue_message_schedule(struct message *messagep, time_t tm)
{
@ -1280,6 +1342,9 @@ queue_message_schedule(struct message *messagep, time_t tm)
if ((messagep->flags & F_MESSAGE_READY) == 0)
return 0;
if ((messagep->flags & F_MESSAGE_COMPLETE) == 0)
return 0;
if ((messagep->flags & F_MESSAGE_PROCESSING) != 0)
return 0;

View File

@ -1,4 +1,4 @@
/* $OpenBSD: smtp.c,v 1.2 2008/11/05 12:14:45 sobrado Exp $ */
/* $OpenBSD: smtp.c,v 1.3 2008/11/11 01:08:08 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@ -412,9 +412,6 @@ smtp_dispatch_queue(int sig, short event, void *p)
break;
}
(void)strlcpy(s->s_msg.message_id, ss->u.msgid,
sizeof(s->s_msg.message_id));
session_pickup(s, ss);
break;

View File

@ -1,4 +1,4 @@
/* $OpenBSD: smtp_session.c,v 1.4 2008/11/10 23:18:47 gilles Exp $ */
/* $OpenBSD: smtp_session.c,v 1.5 2008/11/11 01:08:08 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@ -312,13 +312,6 @@ session_rfc5321_ehlo_handler(struct session *s, char *args)
int
session_rfc5321_rset_handler(struct session *s, char *args)
{
struct path *pathp;
while ((pathp = TAILQ_FIRST(&s->s_msg.recipients)) != NULL) {
TAILQ_REMOVE(&s->s_msg.recipients, pathp, entry);
free(pathp);
}
s->s_msg.rcptcount = 0;
s->s_state = S_HELO;
evbuffer_add_printf(s->s_bev->output, "250 Reset state.\r\n");
@ -337,7 +330,6 @@ session_rfc5321_noop_handler(struct session *s, char *args)
int
session_rfc5321_mail_handler(struct session *s, char *args)
{
struct path *pathp;
char buffer[MAX_PATH_SIZE];
if (s->s_state == S_GREETED) {
@ -359,13 +351,9 @@ session_rfc5321_mail_handler(struct session *s, char *args)
return 1;
}
while ((pathp = TAILQ_FIRST(&s->s_msg.recipients)) != NULL) {
TAILQ_REMOVE(&s->s_msg.recipients, pathp, entry);
free(pathp);
}
s->s_msg.rcptcount = 0;
s->s_state = S_MAILGETFILE;
s->s_state = S_MAILREQUEST;
s->s_flags |= F_IMSG_SENT;
s->s_msg.id = s->s_id;
s->s_msg.session_id = s->s_id;
@ -414,9 +402,8 @@ session_rfc5321_rcpt_handler(struct session *s, char *args)
mr.id = s->s_msg.id;
s->s_state = S_RCPT;
s->s_state = S_RCPTREQUEST;
s->s_flags |= F_IMSG_SENT;
mr.ss = s->s_ss;
imsg_compose(s->s_env->sc_ibufs[PROC_MFA], IMSG_MFA_RCPT_SUBMIT,
@ -553,8 +540,6 @@ session_command(struct session *s, char *cmd, char *args)
void
session_pickup(struct session *s, struct submit_status *ss)
{
struct path *path;
if (s == NULL)
fatal("session_pickup: desynchronized");
@ -590,7 +575,7 @@ session_pickup(struct session *s, struct submit_status *ss)
}
break;
case S_MAILGETFILE:
case S_MAILREQUEST:
/* sender was not accepted, downgrade state */
if (ss->code != 250) {
s->s_state = S_HELO;
@ -600,6 +585,8 @@ session_pickup(struct session *s, struct submit_status *ss)
}
s->s_state = S_MAIL;
s->s_msg.sender = ss->u.path;
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
IMSG_QUEUE_CREATE_MESSAGE_FILE, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
@ -607,11 +594,11 @@ session_pickup(struct session *s, struct submit_status *ss)
break;
case S_MAIL:
s->s_msg.sender = ss->u.path;
evbuffer_add_printf(s->s_bev->output,
"%d Sender ok\r\n", ss->code);
strlcpy(s->s_msg.message_id, ss->u.msgid, MAXPATHLEN);
if (s->s_msg.datafp == NULL) {
/* Remove message file */
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE,
@ -620,23 +607,29 @@ session_pickup(struct session *s, struct submit_status *ss)
}
break;
case S_RCPT:
case S_RCPTREQUEST:
/* recipient was not accepted */
if (ss->code != 250) {
/* We do not have a valid recipient, downgrade state */
if (s->s_msg.rcptcount == 0)
s->s_state = S_MAIL;
else
s->s_state = S_RCPT;
evbuffer_add_printf(s->s_bev->output,
"%d %s\r\n", ss->code, "Recipient rejected");
return;
}
path = calloc(1, sizeof(struct path));
if (path == NULL)
err(1, "calloc");
*path = ss->u.path;
TAILQ_INSERT_TAIL(&s->s_msg.recipients, path, entry);
s->s_state = S_RCPT;
s->s_msg.rcptcount++;
s->s_msg.recipient = ss->u.path;
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
IMSG_QUEUE_MESSAGE_SUBMIT, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
break;
case S_RCPT:
evbuffer_add_printf(s->s_bev->output,
"%d Recipient ok\r\n", ss->code);
break;
@ -658,10 +651,6 @@ session_pickup(struct session *s, struct submit_status *ss)
break;
case S_DONE:
s->s_msg.rcptcount--;
if (s->s_msg.rcptcount)
return;
s->s_state = S_HELO;
s->s_msg.datafp = NULL;
evbuffer_add_printf(s->s_bev->output,
@ -684,10 +673,8 @@ session_init(struct listener *l, struct session *s)
s->s_env = l->env;
s->s_l = l;
s->s_id = queue_generate_id();
strlcpy(s->s_hostname, "<unknown>", MAXHOSTNAMELEN);
TAILQ_INIT(&s->s_msg.recipients);
strlcpy(s->s_msg.session_hostname, s->s_hostname, MAXHOSTNAMELEN);
SPLAY_INSERT(sessiontree, &s->s_env->sc_sessions, s);
@ -812,8 +799,6 @@ session_write(struct bufferevent *bev, void *p)
void
session_destroy(struct session *s)
{
struct path *pathp;
/*
* cleanup
*/
@ -834,11 +819,6 @@ session_destroy(struct session *s)
}
ssl_session_destroy(s);
while ((pathp = TAILQ_FIRST(&s->s_msg.recipients)) != NULL) {
TAILQ_REMOVE(&s->s_msg.recipients, pathp, entry);
free(pathp);
}
SPLAY_REMOVE(sessiontree, &s->s_env->sc_sessions, s);
bzero(s, sizeof(*s));
free(s);
@ -855,15 +835,10 @@ session_error(struct bufferevent *bev, short event, void *p)
void
session_msg_submit(struct session *s)
{
struct path *rpath;
strlcpy(s->s_msg.session_hostname, s->s_hostname, MAXHOSTNAMELEN);
TAILQ_FOREACH(rpath, &s->s_msg.recipients, entry) {
s->s_msg.recipient = *rpath;
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
IMSG_QUEUE_MESSAGE_SUBMIT, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
}
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
IMSG_QUEUE_MESSAGE_COMPLETE, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
s->s_state = S_DONE;
}
int

View File

@ -1,4 +1,4 @@
/* $OpenBSD: smtpd.h,v 1.8 2008/11/10 23:18:47 gilles Exp $ */
/* $OpenBSD: smtpd.h,v 1.9 2008/11/11 01:08:08 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@ -166,6 +166,8 @@ enum imsg_type {
IMSG_QUEUE_DELETE_MESSAGE_FILE,
IMSG_QUEUE_MESSAGE_SUBMIT,
IMSG_QUEUE_MESSAGE_UPDATE,
IMSG_QUEUE_MESSAGE_COMPLETE,
IMSG_QUEUE_MESSAGE_ACK,
IMSG_QUEUE_BATCH_COMPLETE,
IMSG_QUEUE_BATCH_CLOSE,
IMSG_QUEUE_MESSAGE_FD,
@ -513,8 +515,9 @@ enum session_state {
S_TLS,
S_AUTH,
S_HELO,
S_MAILGETFILE,
S_MAILREQUEST,
S_MAIL,
S_RCPTREQUEST,
S_RCPT,
S_DATA,
S_DATACONTENT,