diff options
Diffstat (limited to 'tools/blktap2/drivers/tapdisk-stream.c')
-rw-r--r-- | tools/blktap2/drivers/tapdisk-stream.c | 600 |
1 files changed, 600 insertions, 0 deletions
diff --git a/tools/blktap2/drivers/tapdisk-stream.c b/tools/blktap2/drivers/tapdisk-stream.c new file mode 100644 index 0000000000..8fa9d9e0bf --- /dev/null +++ b/tools/blktap2/drivers/tapdisk-stream.c @@ -0,0 +1,600 @@ +/* + * Copyright (c) 2008, XenSource Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of XenSource Inc. nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#include <stdio.h> +#include <errno.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <unistd.h> + +#include "list.h" +#include "scheduler.h" +#include "tapdisk-vbd.h" +#include "tapdisk-server.h" + +#define POLL_READ 0 +#define POLL_WRITE 1 + +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + +struct tapdisk_stream_poll { + int pipe[2]; + int set; +}; + +struct tapdisk_stream_request { + uint64_t sec; + uint32_t secs; + uint64_t seqno; + blkif_request_t blkif_req; + struct list_head next; +}; + +struct tapdisk_stream { + td_vbd_t *vbd; + + unsigned int id; + int in_fd; + int out_fd; + + int err; + + uint64_t cur; + uint64_t start; + uint64_t end; + + uint64_t started; + uint64_t completed; + + struct tapdisk_stream_poll poll; + event_id_t enqueue_event_id; + + struct list_head free_list; + struct list_head pending_list; + struct list_head completed_list; + + struct tapdisk_stream_request requests[MAX_REQUESTS]; +}; + +static unsigned int tapdisk_stream_count; + +static void tapdisk_stream_close_image(struct tapdisk_stream *); + +static void +usage(const char *app, int err) +{ + printf("usage: %s <-n type:/path/to/image> " + "[-c sector count] [-s skip sectors]\n", app); + exit(err); +} + +static inline void +tapdisk_stream_poll_initialize(struct tapdisk_stream_poll *p) +{ + p->set = 0; + p->pipe[POLL_READ] = p->pipe[POLL_WRITE] = -1; +} + +static int +tapdisk_stream_poll_open(struct tapdisk_stream_poll *p) +{ + int err; + + tapdisk_stream_poll_initialize(p); + + err = pipe(p->pipe); + if (err) + return -errno; + + err = fcntl(p->pipe[POLL_READ], F_SETFL, O_NONBLOCK); + if (err) + goto out; + + err = fcntl(p->pipe[POLL_WRITE], F_SETFL, O_NONBLOCK); + if (err) + goto out; + + return 0; + +out: + close(p->pipe[POLL_READ]); + close(p->pipe[POLL_WRITE]); + tapdisk_stream_poll_initialize(p); + return -errno; +} + +static void +tapdisk_stream_poll_close(struct tapdisk_stream_poll *p) +{ + if (p->pipe[POLL_READ] != -1) + close(p->pipe[POLL_READ]); + if (p->pipe[POLL_WRITE] != -1) + close(p->pipe[POLL_WRITE]); + tapdisk_stream_poll_initialize(p); +} + +static inline void +tapdisk_stream_poll_clear(struct tapdisk_stream_poll *p) +{ + int dummy; + + read(p->pipe[POLL_READ], &dummy, sizeof(dummy)); + p->set = 0; +} + +static inline void +tapdisk_stream_poll_set(struct tapdisk_stream_poll *p) +{ + int dummy = 0; + + if (!p->set) { + write(p->pipe[POLL_WRITE], &dummy, sizeof(dummy)); + p->set = 1; + } +} + +static inline int +tapdisk_stream_stop(struct tapdisk_stream *s) +{ + return (list_empty(&s->pending_list) && (s->cur == s->end || s->err)); +} + +static inline void +tapdisk_stream_initialize_request(struct tapdisk_stream_request *req) +{ + memset(req, 0, sizeof(*req)); + INIT_LIST_HEAD(&req->next); +} + +static inline int +tapdisk_stream_request_idx(struct tapdisk_stream *s, + struct tapdisk_stream_request *req) +{ + return (req - s->requests); +} + +static inline struct tapdisk_stream_request * +tapdisk_stream_get_request(struct tapdisk_stream *s) +{ + struct tapdisk_stream_request *req; + + if (list_empty(&s->free_list)) + return NULL; + + req = list_entry(s->free_list.next, + struct tapdisk_stream_request, next); + + list_del_init(&req->next); + tapdisk_stream_initialize_request(req); + + return req; +} + +static void +tapdisk_stream_print_request(struct tapdisk_stream *s, + struct tapdisk_stream_request *sreq) +{ + unsigned long idx = (unsigned long)tapdisk_stream_request_idx(s, sreq); + char *buf = (char *)MMAP_VADDR(s->vbd->ring.vstart, idx, 0); + write(s->out_fd, buf, sreq->secs << SECTOR_SHIFT); +} + +static void +tapdisk_stream_write_data(struct tapdisk_stream *s) +{ + struct tapdisk_stream_request *sreq, *tmp; + + list_for_each_entry_safe(sreq, tmp, &s->completed_list, next) { + if (sreq->seqno != s->completed) + break; + + s->completed++; + tapdisk_stream_print_request(s, sreq); + + list_del_init(&sreq->next); + list_add_tail(&sreq->next, &s->free_list); + } +} + +static inline void +tapdisk_stream_queue_completed(struct tapdisk_stream *s, + struct tapdisk_stream_request *sreq) +{ + struct tapdisk_stream_request *itr; + + list_for_each_entry(itr, &s->completed_list, next) + if (sreq->seqno < itr->seqno) { + list_add_tail(&sreq->next, &itr->next); + return; + } + + list_add_tail(&sreq->next, &s->completed_list); +} + +static void +tapdisk_stream_dequeue(void *arg, blkif_response_t *rsp) +{ + struct tapdisk_stream *s = (struct tapdisk_stream *)arg; + struct tapdisk_stream_request *sreq = s->requests + rsp->id; + + list_del_init(&sreq->next); + + if (rsp->status == BLKIF_RSP_OKAY) + tapdisk_stream_queue_completed(s, sreq); + else { + s->err = EIO; + list_add_tail(&sreq->next, &s->free_list); + fprintf(stderr, "error reading sector 0x%"PRIu64"\n", sreq->sec); + } + + tapdisk_stream_write_data(s); + tapdisk_stream_poll_set(&s->poll); +} + +static void +tapdisk_stream_enqueue(event_id_t id, char mode, void *arg) +{ + td_vbd_t *vbd; + int i, idx, psize; + struct tapdisk_stream *s = (struct tapdisk_stream *)arg; + + vbd = s->vbd; + tapdisk_stream_poll_clear(&s->poll); + + if (tapdisk_stream_stop(s)) { + tapdisk_stream_close_image(s); + return; + } + + psize = getpagesize(); + + while (s->cur < s->end && !s->err) { + blkif_request_t *breq; + td_vbd_request_t *vreq; + struct tapdisk_stream_request *sreq; + + sreq = tapdisk_stream_get_request(s); + if (!sreq) + break; + + idx = tapdisk_stream_request_idx(s, sreq); + + sreq->sec = s->cur; + sreq->secs = 0; + sreq->seqno = s->started++; + + breq = &sreq->blkif_req; + breq->id = idx; + breq->nr_segments = 0; + breq->sector_number = sreq->sec; + breq->operation = BLKIF_OP_READ; + + for (i = 0; i < BLKIF_MAX_SEGMENTS_PER_REQUEST; i++) { + uint32_t secs = MIN(s->end - s->cur, psize >> SECTOR_SHIFT); + struct blkif_request_segment *seg = breq->seg + i; + + if (!secs) + break; + + sreq->secs += secs; + s->cur += secs; + + seg->first_sect = 0; + seg->last_sect = secs - 1; + breq->nr_segments++; + } + + vreq = vbd->request_list + idx; + + assert(list_empty(&vreq->next)); + assert(vreq->secs_pending == 0); + + memcpy(&vreq->req, breq, sizeof(*breq)); + vbd->received++; + vreq->vbd = vbd; + + tapdisk_vbd_move_request(vreq, &vbd->new_requests); + list_add_tail(&sreq->next, &s->pending_list); + } + + tapdisk_vbd_issue_requests(vbd); +} + +static int +tapdisk_stream_open_image(struct tapdisk_stream *s, const char *path, int type) +{ + int err; + + s->id = tapdisk_stream_count++; + + err = tapdisk_server_initialize(NULL, NULL); + if (err) + goto out; + + err = tapdisk_vbd_initialize(-1, -1, s->id); + if (err) + goto out; + + s->vbd = tapdisk_server_get_vbd(s->id); + if (!s->vbd) { + err = ENODEV; + goto out; + } + + tapdisk_vbd_set_callback(s->vbd, tapdisk_stream_dequeue, s); + + err = tapdisk_vbd_open_vdi(s->vbd, path, type, + TAPDISK_STORAGE_TYPE_DEFAULT, + TD_OPEN_RDONLY); + if (err) + goto out; + + s->vbd->reopened = 1; + err = 0; + +out: + if (err) + fprintf(stderr, "failed to open %s: %d\n", path, err); + return err; +} + +static void +tapdisk_stream_close_image(struct tapdisk_stream *s) +{ + td_vbd_t *vbd; + + vbd = tapdisk_server_get_vbd(s->id); + if (vbd) { + tapdisk_vbd_close_vdi(vbd); + tapdisk_server_remove_vbd(vbd); + free((void *)vbd->ring.vstart); + free(vbd->name); + free(vbd); + s->vbd = NULL; + } +} + +static int +tapdisk_stream_set_position(struct tapdisk_stream *s, + uint64_t count, uint64_t skip) +{ + int err; + image_t image; + + err = tapdisk_vbd_get_image_info(s->vbd, &image); + if (err) { + fprintf(stderr, "failed getting image size: %d\n", err); + return err; + } + + if (count == (uint64_t)-1) + count = image.size - skip; + + if (count + skip > image.size) { + fprintf(stderr, "0x%"PRIx64" past end of image 0x%"PRIx64"\n", + (uint64_t) (count + skip), (uint64_t) image.size); + return -EINVAL; + } + + s->start = skip; + s->cur = s->start; + s->end = s->start + count; + + return 0; +} + +static int +tapdisk_stream_initialize_requests(struct tapdisk_stream *s) +{ + size_t size; + td_ring_t *ring; + int err, i, psize; + + ring = &s->vbd->ring; + psize = getpagesize(); + size = psize * BLKTAP_MMAP_REGION_SIZE; + + /* sneaky -- set up ring->vstart so tapdisk_vbd will use our buffers */ + err = posix_memalign((void **)&ring->vstart, psize, size); + if (err) { + fprintf(stderr, "failed to allocate buffers: %d\n", err); + ring->vstart = 0; + return err; + } + + for (i = 0; i < MAX_REQUESTS; i++) { + struct tapdisk_stream_request *req = s->requests + i; + tapdisk_stream_initialize_request(req); + list_add_tail(&req->next, &s->free_list); + } + + return 0; +} + +static int +tapdisk_stream_register_enqueue_event(struct tapdisk_stream *s) +{ + int err; + struct tapdisk_stream_poll *p = &s->poll; + + err = tapdisk_stream_poll_open(p); + if (err) + goto out; + + err = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, + p->pipe[POLL_READ], 0, + tapdisk_stream_enqueue, s); + if (err < 0) + goto out; + + s->enqueue_event_id = err; + err = 0; + +out: + if (err) + fprintf(stderr, "failed to register event: %d\n", err); + return err; +} + +static void +tapdisk_stream_unregister_enqueue_event(struct tapdisk_stream *s) +{ + if (s->enqueue_event_id) { + tapdisk_server_unregister_event(s->enqueue_event_id); + s->enqueue_event_id = 0; + } + tapdisk_stream_poll_close(&s->poll); +} + +static inline void +tapdisk_stream_initialize(struct tapdisk_stream *s) +{ + memset(s, 0, sizeof(*s)); + s->in_fd = s->out_fd = -1; + INIT_LIST_HEAD(&s->free_list); + INIT_LIST_HEAD(&s->pending_list); + INIT_LIST_HEAD(&s->completed_list); +} + +static int +tapdisk_stream_open_fds(struct tapdisk_stream *s) +{ + s->out_fd = dup(STDOUT_FILENO); + if (s->out_fd == -1) { + fprintf(stderr, "failed to open output: %d\n", errno); + return errno; + } + + return 0; +} + +static int +tapdisk_stream_open(struct tapdisk_stream *s, const char *path, + int type, uint64_t count, uint64_t skip) +{ + int err; + + tapdisk_stream_initialize(s); + + err = tapdisk_stream_open_fds(s); + if (err) + return err; + + err = tapdisk_stream_open_image(s, path, type); + if (err) + return err; + + err = tapdisk_stream_set_position(s, count, skip); + if (err) + return err; + + err = tapdisk_stream_initialize_requests(s); + if (err) + return err; + + err = tapdisk_stream_register_enqueue_event(s); + if (err) + return err; + + return 0; +} + +static void +tapdisk_stream_release(struct tapdisk_stream *s) +{ + close(s->out_fd); + tapdisk_stream_close_image(s); + tapdisk_stream_unregister_enqueue_event(s); +} + +static int +tapdisk_stream_run(struct tapdisk_stream *s) +{ + tapdisk_stream_enqueue(s->enqueue_event_id, SCHEDULER_POLL_READ_FD, s); + tapdisk_server_run(); + return s->err; +} + +int +main(int argc, char *argv[]) +{ + int c, err, type; + char *params, *path; + uint64_t count, skip; + struct tapdisk_stream stream; + + err = 0; + skip = 0; + count = (uint64_t)-1; + params = NULL; + + while ((c = getopt(argc, argv, "n:c:s:h")) != -1) { + switch (c) { + case 'n': + params = optarg; + break; + case 'c': + count = strtoull(optarg, NULL, 10); + break; + case 's': + skip = strtoull(optarg, NULL, 10); + break; + default: + err = EINVAL; + case 'h': + usage(argv[0], err); + } + } + + if (!params) + usage(argv[0], EINVAL); + + err = tapdisk_parse_disk_type(params, &path, &type); + if (err) { + fprintf(stderr, "invalid argument %s: %d\n", params, err); + return err; + } + + tapdisk_start_logging("tapdisk-stream"); + + err = tapdisk_stream_open(&stream, path, type, count, skip); + if (err) + goto out; + + err = tapdisk_stream_run(&stream); + if (err) + goto out; + + err = 0; + +out: + tapdisk_stream_release(&stream); + tapdisk_stop_logging(); + return err; +} |