compression support
This commit is contained in:
parent
84fb151993
commit
6ad42c0889
@ -1024,13 +1024,13 @@ void Connection::read_callback() {
|
||||
full_read = prot->handle_response(input, done, found, obj_size);
|
||||
if (!full_read) {
|
||||
|
||||
char key[256];
|
||||
char log[1024];
|
||||
string keystr = op->key;
|
||||
strcpy(key, keystr.c_str());
|
||||
int valuelen = op->valuelen;
|
||||
sprintf(log,"ERROR SETTING: %s,%d\n",key,valuelen);
|
||||
write(2,log,strlen(log));
|
||||
//char key[256];
|
||||
//char log[1024];
|
||||
//string keystr = op->key;
|
||||
//strcpy(key, keystr.c_str());
|
||||
//int valuelen = op->valuelen;
|
||||
//sprintf(log,"ERROR SETTING: %s,%d\n",key,valuelen);
|
||||
//write(2,log,strlen(log));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,7 @@ src = Split("""mutilate.cc cmdline.cc log.cc distributions.cc util.cc
|
||||
if not env['HAVE_POSIX_BARRIER']: # USE_POSIX_BARRIER:
|
||||
src += ['barrier.cc']
|
||||
|
||||
src += ['libzstd.a']
|
||||
env.Program(target='mutilate', source=src)
|
||||
env.Program(target='gtest', source=['TestGenerator.cc', 'log.cc', 'util.cc',
|
||||
'Generator.cc'])
|
||||
|
236
common.h
Normal file
236
common.h
Normal file
@ -0,0 +1,236 @@
|
||||
/*
|
||||
* Copyright (c) 2016-2021, Yann Collet, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under both the BSD-style license (found in the
|
||||
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
|
||||
* in the COPYING file in the root directory of this source tree).
|
||||
* You may select, at your option, one of the above-listed licenses.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This header file has common utility functions used in examples.
|
||||
*/
|
||||
#ifndef COMMON_H
|
||||
#define COMMON_H
|
||||
|
||||
#include <stdlib.h> // malloc, free, exit
|
||||
#include <stdio.h> // fprintf, perror, fopen, etc.
|
||||
#include <string.h> // strerror
|
||||
#include <errno.h> // errno
|
||||
#include <sys/stat.h> // stat
|
||||
#include <zstd.h>
|
||||
|
||||
/*
|
||||
* Define the returned error code from utility functions.
|
||||
*/
|
||||
typedef enum {
|
||||
ERROR_fsize = 1,
|
||||
ERROR_fopen = 2,
|
||||
ERROR_fclose = 3,
|
||||
ERROR_fread = 4,
|
||||
ERROR_fwrite = 5,
|
||||
ERROR_loadFile = 6,
|
||||
ERROR_saveFile = 7,
|
||||
ERROR_malloc = 8,
|
||||
ERROR_largeFile = 9,
|
||||
} COMMON_ErrorCode;
|
||||
|
||||
/*! CHECK
|
||||
* Check that the condition holds. If it doesn't print a message and die.
|
||||
*/
|
||||
#define CHECK(cond, ...) \
|
||||
do { \
|
||||
if (!(cond)) { \
|
||||
fprintf(stderr, \
|
||||
"%s:%d CHECK(%s) failed: ", \
|
||||
__FILE__, \
|
||||
__LINE__, \
|
||||
#cond); \
|
||||
fprintf(stderr, "" __VA_ARGS__); \
|
||||
fprintf(stderr, "\n"); \
|
||||
exit(1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/*! CHECK_ZSTD
|
||||
* Check the zstd error code and die if an error occurred after printing a
|
||||
* message.
|
||||
*/
|
||||
#define CHECK_ZSTD(fn, ...) \
|
||||
do { \
|
||||
size_t const err = (fn); \
|
||||
CHECK(!ZSTD_isError(err), "%s", ZSTD_getErrorName(err)); \
|
||||
} while (0)
|
||||
|
||||
/*! fsize_orDie() :
|
||||
* Get the size of a given file path.
|
||||
*
|
||||
* @return The size of a given file path.
|
||||
*
|
||||
static size_t fsize_orDie(const char *filename)
|
||||
{
|
||||
struct stat st;
|
||||
if (stat(filename, &st) != 0) {
|
||||
perror(filename);
|
||||
exit(ERROR_fsize);
|
||||
}
|
||||
|
||||
off_t const fileSize = st.st_size;
|
||||
size_t const size = (size_t)fileSize;
|
||||
* 1. fileSize should be non-negative,
|
||||
* 2. if off_t -> size_t type conversion results in discrepancy,
|
||||
* the file size is too large for type size_t.
|
||||
*
|
||||
if ((fileSize < 0) || (fileSize != (off_t)size)) {
|
||||
fprintf(stderr, "%s : filesize too large \n", filename);
|
||||
exit(ERROR_largeFile);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
*/
|
||||
|
||||
/*! fopen_orDie() :
|
||||
* Open a file using given file path and open option.
|
||||
*
|
||||
* @return If successful this function will return a FILE pointer to an
|
||||
* opened file otherwise it sends an error to stderr and exits.
|
||||
*/
|
||||
static FILE* fopen_orDie(const char *filename, const char *instruction)
|
||||
{
|
||||
FILE* const inFile = fopen(filename, instruction);
|
||||
if (inFile) return inFile;
|
||||
/* error */
|
||||
perror(filename);
|
||||
exit(ERROR_fopen);
|
||||
}
|
||||
|
||||
/*! fclose_orDie() :
|
||||
* Close an opened file using given FILE pointer.
|
||||
*/
|
||||
static void fclose_orDie(FILE* file)
|
||||
{
|
||||
if (!fclose(file)) { return; };
|
||||
/* error */
|
||||
perror("fclose");
|
||||
exit(ERROR_fclose);
|
||||
}
|
||||
|
||||
/*! fread_orDie() :
|
||||
*
|
||||
* Read sizeToRead bytes from a given file, storing them at the
|
||||
* location given by buffer.
|
||||
*
|
||||
* @return The number of bytes read.
|
||||
*/
|
||||
static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file)
|
||||
{
|
||||
size_t const readSize = fread(buffer, 1, sizeToRead, file);
|
||||
if (readSize == sizeToRead) return readSize; /* good */
|
||||
if (feof(file)) return readSize; /* good, reached end of file */
|
||||
/* error */
|
||||
perror("fread");
|
||||
exit(ERROR_fread);
|
||||
}
|
||||
|
||||
/*! fwrite_orDie() :
|
||||
*
|
||||
* Write sizeToWrite bytes to a file pointed to by file, obtaining
|
||||
* them from a location given by buffer.
|
||||
*
|
||||
* Note: This function will send an error to stderr and exit if it
|
||||
* cannot write data to the given file pointer.
|
||||
*
|
||||
* @return The number of bytes written.
|
||||
*/
|
||||
//static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file)
|
||||
//{
|
||||
// size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file);
|
||||
// if (writtenSize == sizeToWrite) return sizeToWrite; /* good */
|
||||
// /* error */
|
||||
// perror("fwrite");
|
||||
// exit(ERROR_fwrite);
|
||||
//}
|
||||
|
||||
/*! malloc_orDie() :
|
||||
* Allocate memory.
|
||||
*
|
||||
* @return If successful this function returns a pointer to allo-
|
||||
* cated memory. If there is an error, this function will send that
|
||||
* error to stderr and exit.
|
||||
*/
|
||||
static void* malloc_orDie(size_t size)
|
||||
{
|
||||
void* const buff = malloc(size);
|
||||
if (buff) return buff;
|
||||
/* error */
|
||||
perror("malloc");
|
||||
exit(ERROR_malloc);
|
||||
}
|
||||
|
||||
/*! loadFile_orDie() :
|
||||
* load file into buffer (memory).
|
||||
*
|
||||
* Note: This function will send an error to stderr and exit if it
|
||||
* cannot read data from the given file path.
|
||||
*
|
||||
* @return If successful this function will load file into buffer and
|
||||
* return file size, otherwise it will printout an error to stderr and exit.
|
||||
*
|
||||
static size_t loadFile_orDie(const char* fileName, void* buffer, size_t bufferSize)
|
||||
{
|
||||
size_t const fileSize = fsize_orDie(fileName);
|
||||
CHECK(fileSize <= bufferSize, "File too large!");
|
||||
|
||||
FILE* const inFile = fopen_orDie(fileName, "rb");
|
||||
size_t const readSize = fread(buffer, 1, fileSize, inFile);
|
||||
if (readSize != (size_t)fileSize) {
|
||||
fprintf(stderr, "fread: %s : %s \n", fileName, strerror(errno));
|
||||
exit(ERROR_fread);
|
||||
}
|
||||
fclose(inFile);
|
||||
return fileSize;
|
||||
}
|
||||
*/
|
||||
|
||||
/*! mallocAndLoadFile_orDie() :
|
||||
* allocate memory buffer and then load file into it.
|
||||
*
|
||||
* Note: This function will send an error to stderr and exit if memory allocation
|
||||
* fails or it cannot read data from the given file path.
|
||||
*
|
||||
* @return If successful this function will return buffer and bufferSize(=fileSize),
|
||||
* otherwise it will printout an error to stderr and exit.
|
||||
*
|
||||
static void* mallocAndLoadFile_orDie(const char* fileName, size_t* bufferSize) {
|
||||
size_t const fileSize = fsize_orDie(fileName);
|
||||
*bufferSize = fileSize;
|
||||
void* const buffer = malloc_orDie(*bufferSize);
|
||||
loadFile_orDie(fileName, buffer, *bufferSize);
|
||||
return buffer;
|
||||
}
|
||||
*/
|
||||
|
||||
/*! saveFile_orDie() :
|
||||
*
|
||||
* Save buffSize bytes to a given file path, obtaining them from a location pointed
|
||||
* to by buff.
|
||||
*
|
||||
* Note: This function will send an error to stderr and exit if it
|
||||
* cannot write to a given file.
|
||||
*/
|
||||
//static void saveFile_orDie(const char* fileName, const void* buff, size_t buffSize)
|
||||
//{
|
||||
// FILE* const oFile = fopen_orDie(fileName, "wb");
|
||||
// size_t const wSize = fwrite(buff, 1, buffSize, oFile);
|
||||
// if (wSize != (size_t)buffSize) {
|
||||
// fprintf(stderr, "fwrite: %s : %s \n", fileName, strerror(errno));
|
||||
// exit(ERROR_fwrite);
|
||||
// }
|
||||
// if (fclose(oFile)) {
|
||||
// perror(fileName);
|
||||
// exit(ERROR_fclose);
|
||||
// }
|
||||
//}
|
||||
|
||||
#endif
|
154
mutilate.cc
154
mutilate.cc
@ -21,6 +21,10 @@
|
||||
#include <event2/thread.h>
|
||||
#include <event2/util.h>
|
||||
|
||||
|
||||
#include "common.h" //for zstd
|
||||
#include "zstd.h" //shippped with mutilate
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#ifdef HAVE_LIBZMQ
|
||||
@ -798,20 +802,150 @@ int stick_this_thread_to_core(int core_id) {
|
||||
return pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
|
||||
}
|
||||
|
||||
bool hasEnding (string const &fullString, string const &ending) {
|
||||
if (fullString.length() >= ending.length()) {
|
||||
return (0 == fullString.compare (fullString.length() - ending.length(), ending.length(), ending));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static char *get_stream(ZSTD_DCtx* dctx, FILE *fin, size_t const buffInSize, void* const buffIn, size_t const buffOutSize, void* const buffOut) {
|
||||
/* This loop assumes that the input file is one or more concatenated zstd
|
||||
* streams. This example won't work if there is trailing non-zstd data at
|
||||
* the end, but streaming decompression in general handles this case.
|
||||
* ZSTD_decompressStream() returns 0 exactly when the frame is completed,
|
||||
* and doesn't consume input after the frame.
|
||||
*/
|
||||
size_t const toRead = buffInSize;
|
||||
size_t read;
|
||||
size_t lastRet = 0;
|
||||
int isEmpty = 1;
|
||||
if ( (read = fread_orDie(buffIn, toRead, fin)) ) {
|
||||
isEmpty = 0;
|
||||
ZSTD_inBuffer input = { buffIn, read, 0 };
|
||||
/* Given a valid frame, zstd won't consume the last byte of the frame
|
||||
* until it has flushed all of the decompressed data of the frame.
|
||||
* Therefore, instead of checking if the return code is 0, we can
|
||||
* decompress just check if input.pos < input.size.
|
||||
*/
|
||||
char *trace = (char*)malloc(buffOutSize*2);
|
||||
memset(trace,0,buffOutSize+1);
|
||||
size_t tracelen = buffOutSize+1;
|
||||
size_t total = 0;
|
||||
while (input.pos < input.size) {
|
||||
ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
|
||||
/* The return code is zero if the frame is complete, but there may
|
||||
* be multiple frames concatenated together. Zstd will automatically
|
||||
* reset the context when a frame is complete. Still, calling
|
||||
* ZSTD_DCtx_reset() can be useful to reset the context to a clean
|
||||
* state, for instance if the last decompression call returned an
|
||||
* error.
|
||||
*/
|
||||
|
||||
size_t const ret = ZSTD_decompressStream(dctx, &output , &input);
|
||||
|
||||
if (output.pos + total > tracelen) {
|
||||
trace = (char*)realloc(trace,(output.pos+total+1));
|
||||
tracelen = (output.pos+total+1);
|
||||
}
|
||||
strncat(trace,(const char*)buffOut,output.pos);
|
||||
total += output.pos;
|
||||
|
||||
lastRet = ret;
|
||||
}
|
||||
int idx = total;
|
||||
while (trace[idx] != '\n') {
|
||||
idx--;
|
||||
}
|
||||
trace[idx] = 0;
|
||||
trace[idx+1] = 0;
|
||||
return trace;
|
||||
|
||||
}
|
||||
|
||||
if (isEmpty) {
|
||||
fprintf(stderr, "input is empty\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (lastRet != 0) {
|
||||
/* The last return value from ZSTD_decompressStream did not end on a
|
||||
* frame, but we reached the end of the file! We assume this is an
|
||||
* error, and the input was truncated.
|
||||
*/
|
||||
fprintf(stderr, "EOF before end of stream: %zu\n", lastRet);
|
||||
exit(1);
|
||||
}
|
||||
return NULL;
|
||||
|
||||
}
|
||||
|
||||
void* reader_thread(void *arg) {
|
||||
struct reader_data *rdata = (struct reader_data *) arg;
|
||||
ConcurrentQueue<string> *trace_queue = (ConcurrentQueue<string>*) rdata->trace_queue;
|
||||
|
||||
ifstream trace_file;
|
||||
trace_file.open(rdata->trace_filename);
|
||||
while (trace_file.good()) {
|
||||
string line;
|
||||
getline(trace_file,line);
|
||||
trace_queue->enqueue(line);
|
||||
}
|
||||
string eof = "EOF";
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
trace_queue->enqueue(eof);
|
||||
if (hasEnding(rdata->trace_filename,".zst")) {
|
||||
//init
|
||||
const char *filename = rdata->trace_filename.c_str();
|
||||
FILE* const fin = fopen_orDie(filename, "rb");
|
||||
size_t const buffInSize = ZSTD_DStreamInSize()*1000;
|
||||
void* const buffIn = malloc_orDie(buffInSize);
|
||||
size_t const buffOutSize = ZSTD_DStreamOutSize()*1000;
|
||||
void* const buffOut = malloc_orDie(buffOutSize);
|
||||
|
||||
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
|
||||
CHECK(dctx != NULL, "ZSTD_createDCtx() failed!");
|
||||
//char *leftover = malloc(buffOutSize);
|
||||
//memset(leftover,0,buffOutSize);
|
||||
//char *trace = (char*)decompress(filename);
|
||||
uint64_t nwrites = 0;
|
||||
uint64_t n = 0;
|
||||
char *trace = get_stream(dctx, fin, buffInSize, buffIn, buffOutSize, buffOut);
|
||||
while (trace != NULL) {
|
||||
char *ftrace = trace;
|
||||
char *line = NULL;
|
||||
char *line_p = (char*)calloc(2048,sizeof(char));
|
||||
while ((line = strsep(&trace,"\n"))) {
|
||||
if (strlen(line) < 2048) {
|
||||
strncpy(line_p,line,strlen(line));
|
||||
}
|
||||
string full_line(line);
|
||||
trace_queue->enqueue(full_line);
|
||||
n++;
|
||||
if (n % 1000000 == 0) fprintf(stderr,"decompressed requests: %lu, writes: %lu\n",n,nwrites);
|
||||
|
||||
}
|
||||
free(line_p);
|
||||
free(ftrace);
|
||||
trace = get_stream(dctx, fin, buffInSize, buffIn, buffOutSize, buffOut);
|
||||
}
|
||||
string eof = "EOF";
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
trace_queue->enqueue(eof);
|
||||
}
|
||||
if (trace) {
|
||||
free(trace);
|
||||
}
|
||||
ZSTD_freeDCtx(dctx);
|
||||
fclose_orDie(fin);
|
||||
free(buffIn);
|
||||
free(buffOut);
|
||||
|
||||
|
||||
} else {
|
||||
|
||||
ifstream trace_file;
|
||||
trace_file.open(rdata->trace_filename);
|
||||
while (trace_file.good()) {
|
||||
string line;
|
||||
getline(trace_file,line);
|
||||
trace_queue->enqueue(line);
|
||||
}
|
||||
string eof = "EOF";
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
trace_queue->enqueue(eof);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user