tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

bufferevent.c (26728B)


      1 /*
      2 * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu>
      3 * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
      4 *
      5 * Redistribution and use in source and binary forms, with or without
      6 * modification, are permitted provided that the following conditions
      7 * are met:
      8 * 1. Redistributions of source code must retain the above copyright
      9 *    notice, this list of conditions and the following disclaimer.
     10 * 2. Redistributions in binary form must reproduce the above copyright
     11 *    notice, this list of conditions and the following disclaimer in the
     12 *    documentation and/or other materials provided with the distribution.
     13 * 3. The name of the author may not be used to endorse or promote products
     14 *    derived from this software without specific prior written permission.
     15 *
     16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26 */
     27 
     28 #include "event2/event-config.h"
     29 #include "evconfig-private.h"
     30 
     31 #include <sys/types.h>
     32 
     33 #ifdef EVENT__HAVE_SYS_TIME_H
     34 #include <sys/time.h>
     35 #endif
     36 
     37 #include <errno.h>
     38 #include <stdio.h>
     39 #include <stdlib.h>
     40 #include <string.h>
     41 #ifdef EVENT__HAVE_STDARG_H
     42 #include <stdarg.h>
     43 #endif
     44 
     45 #ifdef _WIN32
     46 #include <winsock2.h>
     47 #endif
     48 
     49 #include "event2/util.h"
     50 #include "event2/buffer.h"
     51 #include "event2/buffer_compat.h"
     52 #include "event2/bufferevent.h"
     53 #include "event2/bufferevent_struct.h"
     54 #include "event2/bufferevent_compat.h"
     55 #include "event2/event.h"
     56 #include "event-internal.h"
     57 #include "log-internal.h"
     58 #include "mm-internal.h"
     59 #include "bufferevent-internal.h"
     60 #include "evbuffer-internal.h"
     61 #include "util-internal.h"
     62 
     63 static void bufferevent_cancel_all_(struct bufferevent *bev);
     64 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
     65 
     66 void
     67 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     68 {
     69 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
     70 BEV_LOCK(bufev);
     71 if (!bufev_private->read_suspended)
     72 	bufev->be_ops->disable(bufev, EV_READ);
     73 bufev_private->read_suspended |= what;
     74 BEV_UNLOCK(bufev);
     75 }
     76 
     77 void
     78 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     79 {
     80 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
     81 BEV_LOCK(bufev);
     82 bufev_private->read_suspended &= ~what;
     83 if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
     84 	bufev->be_ops->enable(bufev, EV_READ);
     85 BEV_UNLOCK(bufev);
     86 }
     87 
     88 void
     89 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     90 {
     91 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
     92 BEV_LOCK(bufev);
     93 if (!bufev_private->write_suspended)
     94 	bufev->be_ops->disable(bufev, EV_WRITE);
     95 bufev_private->write_suspended |= what;
     96 BEV_UNLOCK(bufev);
     97 }
     98 
     99 void
    100 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
    101 {
    102 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    103 BEV_LOCK(bufev);
    104 bufev_private->write_suspended &= ~what;
    105 if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
    106 	bufev->be_ops->enable(bufev, EV_WRITE);
    107 BEV_UNLOCK(bufev);
    108 }
    109 
    110 /**
    111 * Sometimes bufferevent's implementation can overrun high watermarks
    112 * (one of examples is openssl) and in this case if the read callback
    113 * will not handle enough data do over condition above the read
    114 * callback will never be called again (due to suspend above).
    115 *
    116 * To avoid this we are scheduling read callback again here, but only
    117 * from the user callback to avoid multiple scheduling:
    118 * - when the data had been added to it
    119 * - when the data had been drained from it (user specified read callback)
    120 */
    121 static void bufferevent_inbuf_wm_check(struct bufferevent *bev)
    122 {
    123 if (!bev->wm_read.high)
    124 	return;
    125 if (!(bev->enabled & EV_READ))
    126 	return;
    127 if (evbuffer_get_length(bev->input) < bev->wm_read.high)
    128 	return;
    129 
    130 bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
    131 }
    132 
    133 /* Callback to implement watermarks on the input buffer.  Only enabled
    134 * if the watermark is set. */
    135 static void
    136 bufferevent_inbuf_wm_cb(struct evbuffer *buf,
    137    const struct evbuffer_cb_info *cbinfo,
    138    void *arg)
    139 {
    140 struct bufferevent *bufev = arg;
    141 size_t size;
    142 
    143 size = evbuffer_get_length(buf);
    144 
    145 if (size >= bufev->wm_read.high)
    146 	bufferevent_wm_suspend_read(bufev);
    147 else
    148 	bufferevent_wm_unsuspend_read(bufev);
    149 }
    150 
    151 static void
    152 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
    153 {
    154 struct bufferevent_private *bufev_private = arg;
    155 struct bufferevent *bufev = &bufev_private->bev;
    156 
    157 BEV_LOCK(bufev);
    158 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
    159     bufev->errorcb) {
    160 	/* The "connected" happened before any reads or writes, so
    161 	   send it first. */
    162 	bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
    163 	bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
    164 }
    165 if (bufev_private->readcb_pending && bufev->readcb) {
    166 	bufev_private->readcb_pending = 0;
    167 	bufev->readcb(bufev, bufev->cbarg);
    168 	bufferevent_inbuf_wm_check(bufev);
    169 }
    170 if (bufev_private->writecb_pending && bufev->writecb) {
    171 	bufev_private->writecb_pending = 0;
    172 	bufev->writecb(bufev, bufev->cbarg);
    173 }
    174 if (bufev_private->eventcb_pending && bufev->errorcb) {
    175 	short what = bufev_private->eventcb_pending;
    176 	int err = bufev_private->errno_pending;
    177 	bufev_private->eventcb_pending = 0;
    178 	bufev_private->errno_pending = 0;
    179 	EVUTIL_SET_SOCKET_ERROR(err);
    180 	bufev->errorcb(bufev, what, bufev->cbarg);
    181 }
    182 bufferevent_decref_and_unlock_(bufev);
    183 }
    184 
    185 static void
    186 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg)
    187 {
    188 struct bufferevent_private *bufev_private = arg;
    189 struct bufferevent *bufev = &bufev_private->bev;
    190 
    191 BEV_LOCK(bufev);
    192 #define UNLOCKED(stmt) \
    193 do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
    194 
    195 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
    196     bufev->errorcb) {
    197 	/* The "connected" happened before any reads or writes, so
    198 	   send it first. */
    199 	bufferevent_event_cb errorcb = bufev->errorcb;
    200 	void *cbarg = bufev->cbarg;
    201 	bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
    202 	UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
    203 }
    204 if (bufev_private->readcb_pending && bufev->readcb) {
    205 	bufferevent_data_cb readcb = bufev->readcb;
    206 	void *cbarg = bufev->cbarg;
    207 	bufev_private->readcb_pending = 0;
    208 	UNLOCKED(readcb(bufev, cbarg));
    209 	bufferevent_inbuf_wm_check(bufev);
    210 }
    211 if (bufev_private->writecb_pending && bufev->writecb) {
    212 	bufferevent_data_cb writecb = bufev->writecb;
    213 	void *cbarg = bufev->cbarg;
    214 	bufev_private->writecb_pending = 0;
    215 	UNLOCKED(writecb(bufev, cbarg));
    216 }
    217 if (bufev_private->eventcb_pending && bufev->errorcb) {
    218 	bufferevent_event_cb errorcb = bufev->errorcb;
    219 	void *cbarg = bufev->cbarg;
    220 	short what = bufev_private->eventcb_pending;
    221 	int err = bufev_private->errno_pending;
    222 	bufev_private->eventcb_pending = 0;
    223 	bufev_private->errno_pending = 0;
    224 	EVUTIL_SET_SOCKET_ERROR(err);
    225 	UNLOCKED(errorcb(bufev,what,cbarg));
    226 }
    227 bufferevent_decref_and_unlock_(bufev);
    228 #undef UNLOCKED
    229 }
    230 
    231 #define SCHEDULE_DEFERRED(bevp)						\
    232 do {								\
    233 	if (event_deferred_cb_schedule_(			\
    234 		    (bevp)->bev.ev_base,			\
    235 		&(bevp)->deferred))				\
    236 		bufferevent_incref_(&(bevp)->bev);		\
    237 } while (0)
    238 
    239 
    240 void
    241 bufferevent_run_readcb_(struct bufferevent *bufev, int options)
    242 {
    243 /* Requires that we hold the lock and a reference */
    244 struct bufferevent_private *p = BEV_UPCAST(bufev);
    245 if (bufev->readcb == NULL)
    246 	return;
    247 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    248 	p->readcb_pending = 1;
    249 	SCHEDULE_DEFERRED(p);
    250 } else {
    251 	bufev->readcb(bufev, bufev->cbarg);
    252 	bufferevent_inbuf_wm_check(bufev);
    253 }
    254 }
    255 
    256 void
    257 bufferevent_run_writecb_(struct bufferevent *bufev, int options)
    258 {
    259 /* Requires that we hold the lock and a reference */
    260 struct bufferevent_private *p = BEV_UPCAST(bufev);
    261 if (bufev->writecb == NULL)
    262 	return;
    263 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    264 	p->writecb_pending = 1;
    265 	SCHEDULE_DEFERRED(p);
    266 } else {
    267 	bufev->writecb(bufev, bufev->cbarg);
    268 }
    269 }
    270 
    271 #define BEV_TRIG_ALL_OPTS (			\
    272 	BEV_TRIG_IGNORE_WATERMARKS|	\
    273 	BEV_TRIG_DEFER_CALLBACKS	\
    274 )
    275 
    276 void
    277 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
    278 {
    279 bufferevent_incref_and_lock_(bufev);
    280 bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS);
    281 bufferevent_decref_and_unlock_(bufev);
    282 }
    283 
    284 void
    285 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
    286 {
    287 /* Requires that we hold the lock and a reference */
    288 struct bufferevent_private *p = BEV_UPCAST(bufev);
    289 if (bufev->errorcb == NULL)
    290 	return;
    291 if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    292 	p->eventcb_pending |= what;
    293 	p->errno_pending = EVUTIL_SOCKET_ERROR();
    294 	SCHEDULE_DEFERRED(p);
    295 } else {
    296 	bufev->errorcb(bufev, what, bufev->cbarg);
    297 }
    298 }
    299 
    300 void
    301 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
    302 {
    303 bufferevent_incref_and_lock_(bufev);
    304 bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS);
    305 bufferevent_decref_and_unlock_(bufev);
    306 }
    307 
    308 int
    309 bufferevent_init_common_(struct bufferevent_private *bufev_private,
    310    struct event_base *base,
    311    const struct bufferevent_ops *ops,
    312    enum bufferevent_options options)
    313 {
    314 struct bufferevent *bufev = &bufev_private->bev;
    315 
    316 if (!bufev->input) {
    317 	if ((bufev->input = evbuffer_new()) == NULL)
    318 		goto err;
    319 }
    320 
    321 if (!bufev->output) {
    322 	if ((bufev->output = evbuffer_new()) == NULL)
    323 		goto err;
    324 }
    325 
    326 bufev_private->refcnt = 1;
    327 bufev->ev_base = base;
    328 
    329 /* Disable timeouts. */
    330 evutil_timerclear(&bufev->timeout_read);
    331 evutil_timerclear(&bufev->timeout_write);
    332 
    333 bufev->be_ops = ops;
    334 
    335 if (bufferevent_ratelim_init_(bufev_private))
    336 	goto err;
    337 
    338 /*
    339  * Set to EV_WRITE so that using bufferevent_write is going to
    340  * trigger a callback.  Reading needs to be explicitly enabled
    341  * because otherwise no data will be available.
    342  */
    343 bufev->enabled = EV_WRITE;
    344 
    345 #ifndef EVENT__DISABLE_THREAD_SUPPORT
    346 if (options & BEV_OPT_THREADSAFE) {
    347 	if (bufferevent_enable_locking_(bufev, NULL) < 0)
    348 		goto err;
    349 }
    350 #endif
    351 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
    352     == BEV_OPT_UNLOCK_CALLBACKS) {
    353 	event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
    354 	goto err;
    355 }
    356 if (options & BEV_OPT_UNLOCK_CALLBACKS)
    357 	event_deferred_cb_init_(
    358 	    &bufev_private->deferred,
    359 	    event_base_get_npriorities(base) / 2,
    360 	    bufferevent_run_deferred_callbacks_unlocked,
    361 	    bufev_private);
    362 else
    363 	event_deferred_cb_init_(
    364 	    &bufev_private->deferred,
    365 	    event_base_get_npriorities(base) / 2,
    366 	    bufferevent_run_deferred_callbacks_locked,
    367 	    bufev_private);
    368 
    369 bufev_private->options = options;
    370 
    371 evbuffer_set_parent_(bufev->input, bufev);
    372 evbuffer_set_parent_(bufev->output, bufev);
    373 
    374 return 0;
    375 
    376 err:
    377 if (bufev->input) {
    378 	evbuffer_free(bufev->input);
    379 	bufev->input = NULL;
    380 }
    381 if (bufev->output) {
    382 	evbuffer_free(bufev->output);
    383 	bufev->output = NULL;
    384 }
    385 return -1;
    386 }
    387 
    388 void
    389 bufferevent_setcb(struct bufferevent *bufev,
    390    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    391    bufferevent_event_cb eventcb, void *cbarg)
    392 {
    393 BEV_LOCK(bufev);
    394 
    395 bufev->readcb = readcb;
    396 bufev->writecb = writecb;
    397 bufev->errorcb = eventcb;
    398 
    399 bufev->cbarg = cbarg;
    400 BEV_UNLOCK(bufev);
    401 }
    402 
    403 void
    404 bufferevent_getcb(struct bufferevent *bufev,
    405    bufferevent_data_cb *readcb_ptr,
    406    bufferevent_data_cb *writecb_ptr,
    407    bufferevent_event_cb *eventcb_ptr,
    408    void **cbarg_ptr)
    409 {
    410 BEV_LOCK(bufev);
    411 if (readcb_ptr)
    412 	*readcb_ptr = bufev->readcb;
    413 if (writecb_ptr)
    414 	*writecb_ptr = bufev->writecb;
    415 if (eventcb_ptr)
    416 	*eventcb_ptr = bufev->errorcb;
    417 if (cbarg_ptr)
    418 	*cbarg_ptr = bufev->cbarg;
    419 
    420 BEV_UNLOCK(bufev);
    421 }
    422 
    423 struct evbuffer *
    424 bufferevent_get_input(struct bufferevent *bufev)
    425 {
    426 return bufev->input;
    427 }
    428 
    429 struct evbuffer *
    430 bufferevent_get_output(struct bufferevent *bufev)
    431 {
    432 return bufev->output;
    433 }
    434 
    435 struct event_base *
    436 bufferevent_get_base(struct bufferevent *bufev)
    437 {
    438 return bufev->ev_base;
    439 }
    440 
    441 int
    442 bufferevent_get_priority(const struct bufferevent *bufev)
    443 {
    444 if (event_initialized(&bufev->ev_read)) {
    445 	return event_get_priority(&bufev->ev_read);
    446 } else {
    447 	return event_base_get_npriorities(bufev->ev_base) / 2;
    448 }
    449 }
    450 
    451 int
    452 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
    453 {
    454 if (evbuffer_add(bufev->output, data, size) == -1)
    455 	return (-1);
    456 
    457 return 0;
    458 }
    459 
    460 int
    461 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    462 {
    463 if (evbuffer_add_buffer(bufev->output, buf) == -1)
    464 	return (-1);
    465 
    466 return 0;
    467 }
    468 
    469 size_t
    470 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    471 {
    472 return (evbuffer_remove(bufev->input, data, size));
    473 }
    474 
    475 int
    476 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    477 {
    478 return (evbuffer_add_buffer(buf, bufev->input));
    479 }
    480 
    481 int
    482 bufferevent_enable(struct bufferevent *bufev, short event)
    483 {
    484 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    485 short impl_events = event;
    486 int r = 0;
    487 
    488 bufferevent_incref_and_lock_(bufev);
    489 if (bufev_private->read_suspended)
    490 	impl_events &= ~EV_READ;
    491 if (bufev_private->write_suspended)
    492 	impl_events &= ~EV_WRITE;
    493 
    494 bufev->enabled |= event;
    495 
    496 if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
    497 	r = -1;
    498 if (r)
    499 	event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev));
    500 
    501 bufferevent_decref_and_unlock_(bufev);
    502 return r;
    503 }
    504 
    505 int
    506 bufferevent_set_timeouts(struct bufferevent *bufev,
    507 		 const struct timeval *tv_read,
    508 		 const struct timeval *tv_write)
    509 {
    510 int r = 0;
    511 BEV_LOCK(bufev);
    512 if (tv_read) {
    513 	bufev->timeout_read = *tv_read;
    514 } else {
    515 	evutil_timerclear(&bufev->timeout_read);
    516 }
    517 if (tv_write) {
    518 	bufev->timeout_write = *tv_write;
    519 } else {
    520 	evutil_timerclear(&bufev->timeout_write);
    521 }
    522 
    523 if (bufev->be_ops->adj_timeouts)
    524 	r = bufev->be_ops->adj_timeouts(bufev);
    525 BEV_UNLOCK(bufev);
    526 
    527 return r;
    528 }
    529 
    530 
    531 /* Obsolete; use bufferevent_set_timeouts */
    532 void
    533 bufferevent_settimeout(struct bufferevent *bufev,
    534 	       int timeout_read, int timeout_write)
    535 {
    536 struct timeval tv_read, tv_write;
    537 struct timeval *ptv_read = NULL, *ptv_write = NULL;
    538 
    539 memset(&tv_read, 0, sizeof(tv_read));
    540 memset(&tv_write, 0, sizeof(tv_write));
    541 
    542 if (timeout_read) {
    543 	tv_read.tv_sec = timeout_read;
    544 	ptv_read = &tv_read;
    545 }
    546 if (timeout_write) {
    547 	tv_write.tv_sec = timeout_write;
    548 	ptv_write = &tv_write;
    549 }
    550 
    551 bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
    552 }
    553 
    554 
    555 int
    556 bufferevent_disable_hard_(struct bufferevent *bufev, short event)
    557 {
    558 int r = 0;
    559 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    560 
    561 BEV_LOCK(bufev);
    562 bufev->enabled &= ~event;
    563 
    564 bufev_private->connecting = 0;
    565 if (bufev->be_ops->disable(bufev, event) < 0)
    566 	r = -1;
    567 
    568 BEV_UNLOCK(bufev);
    569 return r;
    570 }
    571 
    572 int
    573 bufferevent_disable(struct bufferevent *bufev, short event)
    574 {
    575 int r = 0;
    576 
    577 BEV_LOCK(bufev);
    578 bufev->enabled &= ~event;
    579 
    580 if (bufev->be_ops->disable(bufev, event) < 0)
    581 	r = -1;
    582 if (r)
    583 	event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev));
    584 
    585 BEV_UNLOCK(bufev);
    586 return r;
    587 }
    588 
    589 /*
    590 * Sets the water marks
    591 */
    592 
    593 void
    594 bufferevent_setwatermark(struct bufferevent *bufev, short events,
    595    size_t lowmark, size_t highmark)
    596 {
    597 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    598 
    599 BEV_LOCK(bufev);
    600 if (events & EV_WRITE) {
    601 	bufev->wm_write.low = lowmark;
    602 	bufev->wm_write.high = highmark;
    603 }
    604 
    605 if (events & EV_READ) {
    606 	bufev->wm_read.low = lowmark;
    607 	bufev->wm_read.high = highmark;
    608 
    609 	if (highmark) {
    610 		/* There is now a new high-water mark for read.
    611 		   enable the callback if needed, and see if we should
    612 		   suspend/bufferevent_wm_unsuspend. */
    613 
    614 		if (bufev_private->read_watermarks_cb == NULL) {
    615 			bufev_private->read_watermarks_cb =
    616 			    evbuffer_add_cb(bufev->input,
    617 					    bufferevent_inbuf_wm_cb,
    618 					    bufev);
    619 		}
    620 		evbuffer_cb_set_flags(bufev->input,
    621 			      bufev_private->read_watermarks_cb,
    622 			      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
    623 
    624 		if (evbuffer_get_length(bufev->input) >= highmark)
    625 			bufferevent_wm_suspend_read(bufev);
    626 		else if (evbuffer_get_length(bufev->input) < highmark)
    627 			bufferevent_wm_unsuspend_read(bufev);
    628 	} else {
    629 		/* There is now no high-water mark for read. */
    630 		if (bufev_private->read_watermarks_cb)
    631 			evbuffer_cb_clear_flags(bufev->input,
    632 			    bufev_private->read_watermarks_cb,
    633 			    EVBUFFER_CB_ENABLED);
    634 		bufferevent_wm_unsuspend_read(bufev);
    635 	}
    636 }
    637 BEV_UNLOCK(bufev);
    638 }
    639 
    640 int
    641 bufferevent_getwatermark(struct bufferevent *bufev, short events,
    642    size_t *lowmark, size_t *highmark)
    643 {
    644 if (events == EV_WRITE) {
    645 	BEV_LOCK(bufev);
    646 	if (lowmark)
    647 		*lowmark = bufev->wm_write.low;
    648 	if (highmark)
    649 		*highmark = bufev->wm_write.high;
    650 	BEV_UNLOCK(bufev);
    651 	return 0;
    652 }
    653 
    654 if (events == EV_READ) {
    655 	BEV_LOCK(bufev);
    656 	if (lowmark)
    657 		*lowmark = bufev->wm_read.low;
    658 	if (highmark)
    659 		*highmark = bufev->wm_read.high;
    660 	BEV_UNLOCK(bufev);
    661 	return 0;
    662 }
    663 return -1;
    664 }
    665 
    666 int
    667 bufferevent_flush(struct bufferevent *bufev,
    668    short iotype,
    669    enum bufferevent_flush_mode mode)
    670 {
    671 int r = -1;
    672 BEV_LOCK(bufev);
    673 if (bufev->be_ops->flush)
    674 	r = bufev->be_ops->flush(bufev, iotype, mode);
    675 BEV_UNLOCK(bufev);
    676 return r;
    677 }
    678 
    679 void
    680 bufferevent_incref_and_lock_(struct bufferevent *bufev)
    681 {
    682 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    683 BEV_LOCK(bufev);
    684 ++bufev_private->refcnt;
    685 }
    686 
    687 #if 0
    688 static void
    689 bufferevent_transfer_lock_ownership_(struct bufferevent *donor,
    690    struct bufferevent *recipient)
    691 {
    692 struct bufferevent_private *d = BEV_UPCAST(donor);
    693 struct bufferevent_private *r = BEV_UPCAST(recipient);
    694 if (d->lock != r->lock)
    695 	return;
    696 if (r->own_lock)
    697 	return;
    698 if (d->own_lock) {
    699 	d->own_lock = 0;
    700 	r->own_lock = 1;
    701 }
    702 }
    703 #endif
    704 
    705 int
    706 bufferevent_decref_and_unlock_(struct bufferevent *bufev)
    707 {
    708 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    709 int n_cbs = 0;
    710 #define MAX_CBS 16
    711 struct event_callback *cbs[MAX_CBS];
    712 
    713 EVUTIL_ASSERT(bufev_private->refcnt > 0);
    714 
    715 if (--bufev_private->refcnt) {
    716 	BEV_UNLOCK(bufev);
    717 	return 0;
    718 }
    719 
    720 if (bufev->be_ops->unlink)
    721 	bufev->be_ops->unlink(bufev);
    722 
    723 /* Okay, we're out of references. Let's finalize this once all the
    724  * callbacks are done running. */
    725 cbs[0] = &bufev->ev_read.ev_evcallback;
    726 cbs[1] = &bufev->ev_write.ev_evcallback;
    727 cbs[2] = &bufev_private->deferred;
    728 n_cbs = 3;
    729 if (bufev_private->rate_limiting) {
    730 	struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
    731 	if (event_initialized(e))
    732 		cbs[n_cbs++] = &e->ev_evcallback;
    733 }
    734 n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
    735 n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
    736 
    737 event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
    738     bufferevent_finalize_cb_);
    739 
    740 #undef MAX_CBS
    741 BEV_UNLOCK(bufev);
    742 
    743 return 1;
    744 }
    745 
    746 static void
    747 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
    748 {
    749 struct bufferevent *bufev = arg_;
    750 struct bufferevent *underlying;
    751 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    752 
    753 BEV_LOCK(bufev);
    754 underlying = bufferevent_get_underlying(bufev);
    755 
    756 /* Clean up the shared info */
    757 if (bufev->be_ops->destruct)
    758 	bufev->be_ops->destruct(bufev);
    759 
    760 /* XXX what happens if refcnt for these buffers is > 1?
    761  * The buffers can share a lock with this bufferevent object,
    762  * but the lock might be destroyed below. */
    763 /* evbuffer will free the callbacks */
    764 evbuffer_free(bufev->input);
    765 evbuffer_free(bufev->output);
    766 
    767 if (bufev_private->rate_limiting) {
    768 	if (bufev_private->rate_limiting->group)
    769 		bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
    770 	mm_free(bufev_private->rate_limiting);
    771 	bufev_private->rate_limiting = NULL;
    772 }
    773 
    774 
    775 BEV_UNLOCK(bufev);
    776 
    777 if (bufev_private->own_lock)
    778 	EVTHREAD_FREE_LOCK(bufev_private->lock,
    779 	    EVTHREAD_LOCKTYPE_RECURSIVE);
    780 
    781 /* Free the actual allocated memory. */
    782 mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
    783 
    784 /* Release the reference to underlying now that we no longer need the
    785  * reference to it.  We wait this long mainly in case our lock is
    786  * shared with underlying.
    787  *
    788  * The 'destruct' function will also drop a reference to underlying
    789  * if BEV_OPT_CLOSE_ON_FREE is set.
    790  *
    791  * XXX Should we/can we just refcount evbuffer/bufferevent locks?
    792  * It would probably save us some headaches.
    793  */
    794 if (underlying)
    795 	bufferevent_decref_(underlying);
    796 }
    797 
    798 int
    799 bufferevent_decref(struct bufferevent *bufev)
    800 {
    801 BEV_LOCK(bufev);
    802 return bufferevent_decref_and_unlock_(bufev);
    803 }
    804 
    805 void
    806 bufferevent_free(struct bufferevent *bufev)
    807 {
    808 BEV_LOCK(bufev);
    809 bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
    810 bufferevent_cancel_all_(bufev);
    811 bufferevent_decref_and_unlock_(bufev);
    812 }
    813 
    814 void
    815 bufferevent_incref(struct bufferevent *bufev)
    816 {
    817 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    818 
    819 /* XXX: now that this function is public, we might want to
    820  * - return the count from this function
    821  * - create a new function to atomically grab the current refcount
    822  */
    823 BEV_LOCK(bufev);
    824 ++bufev_private->refcnt;
    825 BEV_UNLOCK(bufev);
    826 }
    827 
    828 int
    829 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock)
    830 {
    831 #ifdef EVENT__DISABLE_THREAD_SUPPORT
    832 return -1;
    833 #else
    834 struct bufferevent *underlying;
    835 
    836 if (BEV_UPCAST(bufev)->lock)
    837 	return -1;
    838 underlying = bufferevent_get_underlying(bufev);
    839 
    840 if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
    841 	lock = BEV_UPCAST(underlying)->lock;
    842 	BEV_UPCAST(bufev)->lock = lock;
    843 	BEV_UPCAST(bufev)->own_lock = 0;
    844 } else if (!lock) {
    845 	EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    846 	if (!lock)
    847 		return -1;
    848 	BEV_UPCAST(bufev)->lock = lock;
    849 	BEV_UPCAST(bufev)->own_lock = 1;
    850 } else {
    851 	BEV_UPCAST(bufev)->lock = lock;
    852 	BEV_UPCAST(bufev)->own_lock = 0;
    853 }
    854 evbuffer_enable_locking(bufev->input, lock);
    855 evbuffer_enable_locking(bufev->output, lock);
    856 
    857 if (underlying && !BEV_UPCAST(underlying)->lock)
    858 	bufferevent_enable_locking_(underlying, lock);
    859 
    860 return 0;
    861 #endif
    862 }
    863 
    864 int
    865 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
    866 {
    867 union bufferevent_ctrl_data d;
    868 int res = -1;
    869 d.fd = fd;
    870 BEV_LOCK(bev);
    871 if (bev->be_ops->ctrl)
    872 	res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
    873 if (res)
    874 	event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd));
    875 BEV_UNLOCK(bev);
    876 return res;
    877 }
    878 
    879 evutil_socket_t
    880 bufferevent_getfd(struct bufferevent *bev)
    881 {
    882 union bufferevent_ctrl_data d;
    883 int res = -1;
    884 d.fd = -1;
    885 BEV_LOCK(bev);
    886 if (bev->be_ops->ctrl)
    887 	res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
    888 if (res)
    889 	event_debug(("%s: cannot get fd for %p", __func__, bev));
    890 BEV_UNLOCK(bev);
    891 return (res<0) ? -1 : d.fd;
    892 }
    893 
    894 enum bufferevent_options
    895 bufferevent_get_options_(struct bufferevent *bev)
    896 {
    897 struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    898 enum bufferevent_options options;
    899 
    900 BEV_LOCK(bev);
    901 options = bev_p->options;
    902 BEV_UNLOCK(bev);
    903 return options;
    904 }
    905 
    906 
    907 static void
    908 bufferevent_cancel_all_(struct bufferevent *bev)
    909 {
    910 union bufferevent_ctrl_data d;
    911 memset(&d, 0, sizeof(d));
    912 BEV_LOCK(bev);
    913 if (bev->be_ops->ctrl)
    914 	bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
    915 BEV_UNLOCK(bev);
    916 }
    917 
    918 short
    919 bufferevent_get_enabled(struct bufferevent *bufev)
    920 {
    921 short r;
    922 BEV_LOCK(bufev);
    923 r = bufev->enabled;
    924 BEV_UNLOCK(bufev);
    925 return r;
    926 }
    927 
    928 struct bufferevent *
    929 bufferevent_get_underlying(struct bufferevent *bev)
    930 {
    931 union bufferevent_ctrl_data d;
    932 int res = -1;
    933 d.ptr = NULL;
    934 BEV_LOCK(bev);
    935 if (bev->be_ops->ctrl)
    936 	res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
    937 BEV_UNLOCK(bev);
    938 return (res<0) ? NULL : d.ptr;
    939 }
    940 
    941 static void
    942 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
    943 {
    944 struct bufferevent *bev = ctx;
    945 bufferevent_incref_and_lock_(bev);
    946 bufferevent_disable(bev, EV_READ);
    947 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
    948 bufferevent_decref_and_unlock_(bev);
    949 }
    950 static void
    951 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
    952 {
    953 struct bufferevent *bev = ctx;
    954 bufferevent_incref_and_lock_(bev);
    955 bufferevent_disable(bev, EV_WRITE);
    956 bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
    957 bufferevent_decref_and_unlock_(bev);
    958 }
    959 
    960 void
    961 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
    962 {
    963 event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
    964     bufferevent_generic_read_timeout_cb, bev);
    965 event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
    966     bufferevent_generic_write_timeout_cb, bev);
    967 }
    968 
    969 int
    970 bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
    971 {
    972 const short enabled = bev->enabled;
    973 struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    974 int r1=0, r2=0;
    975 if ((enabled & EV_READ) && !bev_p->read_suspended &&
    976     evutil_timerisset(&bev->timeout_read))
    977 	r1 = event_add(&bev->ev_read, &bev->timeout_read);
    978 else
    979 	r1 = event_del(&bev->ev_read);
    980 
    981 if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
    982     evutil_timerisset(&bev->timeout_write) &&
    983     evbuffer_get_length(bev->output))
    984 	r2 = event_add(&bev->ev_write, &bev->timeout_write);
    985 else
    986 	r2 = event_del(&bev->ev_write);
    987 if (r1 < 0 || r2 < 0)
    988 	return -1;
    989 return 0;
    990 }
    991 
    992 int
    993 bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev)
    994 {
    995 int r = 0;
    996 if (event_pending(&bev->ev_read, EV_READ, NULL)) {
    997 	if (evutil_timerisset(&bev->timeout_read)) {
    998 		    if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0)
    999 			    r = -1;
   1000 	} else {
   1001 		event_remove_timer(&bev->ev_read);
   1002 	}
   1003 }
   1004 if (event_pending(&bev->ev_write, EV_WRITE, NULL)) {
   1005 	if (evutil_timerisset(&bev->timeout_write)) {
   1006 		if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0)
   1007 			r = -1;
   1008 	} else {
   1009 		event_remove_timer(&bev->ev_write);
   1010 	}
   1011 }
   1012 return r;
   1013 }
   1014 
   1015 int
   1016 bufferevent_add_event_(struct event *ev, const struct timeval *tv)
   1017 {
   1018 if (!evutil_timerisset(tv))
   1019 	return event_add(ev, NULL);
   1020 else
   1021 	return event_add(ev, tv);
   1022 }
   1023 
   1024 /* For use by user programs only; internally, we should be calling
   1025   either bufferevent_incref_and_lock_(), or BEV_LOCK. */
   1026 void
   1027 bufferevent_lock(struct bufferevent *bev)
   1028 {
   1029 bufferevent_incref_and_lock_(bev);
   1030 }
   1031 
   1032 void
   1033 bufferevent_unlock(struct bufferevent *bev)
   1034 {
   1035 bufferevent_decref_and_unlock_(bev);
   1036 }