/****************************************************************************** VERSION $Id$ PACKAGE: User Level Shared Memory Manager DESCRIPTION: This package provides a buffer pool interface implemented as a collection of file pages mapped into shared memory. Based on Mark's buffer manager ROUTINES: External buf_alloc buf_flags buf_get buf_init buf_last buf_open buf_pin buf_sync buf_unpin Internal bf_assign_buf bf_fid_to_fd bf_newbuf bf_put_page ******************************************************************************/ #include #include #include #include #include #include #include "list.h" #include "user.h" #include "txn_sys.h" #include "buf.h" #include "semkeys.h" #include "error.h" /* we need to translate between some type of file id that the user process passes and a file descriptor. For now, it's a nop. */ #define GET_MASTER get_sem ( buf_spinlock ) #define RELEASE_MASTER release_sem ( buf_spinlock ) #define LRUID *buf_lru #define LRUP (bufhdr_table+*buf_lru) #define MRU bufhdr_table[*buf_lru].lru.prev /* Global indicator that you have started reusing buffers */ int do_statistics = 0; /* Process Statics (pointers into shared memory) */ static BUF_T *buf_table = 0; static BUFHDR_T *bufhdr_table; static int *buf_hash_table; static int *buf_lru; /* LRU is the free list */ static int buf_spinlock; static FINFO_T *buf_fids; static int *buf_sp; /* Pointer to string free space */ static char *buf_strings; /* Process Local FID->FD table */ static int fds[NUM_FILE_ENTRIES]; /* Static routines */ static BUFHDR_T *bf_assign_buf(); static int bf_fid_to_fd(); static BUFHDR_T *bf_newbuf(); static int bf_put_page(); /* Return 0 on success 1 on failure */ extern int buf_init ( ) { ADDR_T buf_region; BUFHDR_T *bhp; int i; int ref_count; int *spinlockp; /* Initialize Process local structures */ for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) { fds[i] = -1; } buf_region = attach_region ( BUF_REGION_NAME, BUF_REGION_NUM, BUF_REGION_SIZE, &ref_count ); if ( !buf_region ) { return (1); } error_log3 ( "Buf Region: ADDR: %d ID: %d SIZE: %d\n", buf_region, BUF_REGION_NUM, BUF_REGION_SIZE ); buf_table = (BUF_T *)buf_region; bufhdr_table = (BUFHDR_T *)(buf_table + NUM_BUFS); buf_hash_table = (int *)(bufhdr_table + NUM_BUFS); buf_lru = buf_hash_table + NUMTABLE_ENTRIES; spinlockp = buf_lru + 1; buf_fids = (FINFO_T *)(spinlockp+1); buf_sp = (int *)(buf_fids + NUM_FILE_ENTRIES); buf_strings = (char *)(buf_sp + 1); /* Create locking spinlock (gets creating holding the lock) */ buf_spinlock = create_sem ( BUF_SPIN_NAME, BUF_SPIN_NUM, ref_count <= 1 ); if ( buf_spinlock < 0 ) { return(1); } if ( ref_count <= 1 ) { *spinlockp = buf_spinlock; /* Now initialize the buffer manager */ /* 1. Free list */ *buf_lru = 0; /* 2. Buffer headers */ for ( i = 0, bhp = bufhdr_table; i < NUM_BUFS; bhp++, i++ ) { bhp->lru.next = i+1; bhp->lru.prev = i-1; bhp->flags = 0; /* All Flags off */ bhp->refcount = 0; bhp->wait_proc = -1; /* No sleepers */ LISTPE_INIT ( hash, bhp, i ); /* Hash chains */ } bufhdr_table[0].lru.prev = NUM_BUFS-1; bufhdr_table[NUM_BUFS-1].lru.next = 0; /* 3. Hash Table */ for ( i = 0; i < NUMTABLE_ENTRIES; i++ ) { buf_hash_table[i] = NUM_BUFS; } /* 4. File ID Table */ for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) { buf_fids[i].offset = -1; buf_fids[i].npages = -1; buf_fids[i].refcount = 0; } /* 5. Free String Pointer */ *buf_sp = (FILE_NAME_LEN*NUM_FILE_ENTRIES); if (RELEASE_MASTER) { return(1); } error_log0 ( "Initialized buffer region\n" ); } return (0); } extern void buf_exit() { int ref; int i; /* Flush Buffer Pool on Exit */ for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) { if ( fds[i] != -1 ) { close ( fds[i] ); } } if ( buf_table ) { detach_region ( buf_table, BUF_REGION_NUM, BUF_REGION_SIZE, &ref ); } return; } /* We need an empty buffer. Find the LRU unpinned NON-Dirty page. */ static BUFHDR_T * bf_newbuf() { int fd; int lruid; int nbytes; int ndx; BUFHDR_T *bhp; lruid = LRUID; for ( bhp = LRUP; bhp->flags & (BUF_PINNED|BUF_IO_IN_PROGRESS); bhp = LISTP_NEXTP (bufhdr_table, lru, bhp ) ) { if ( bhp->lru.next == lruid ) { /* OUT OF BUFFERS */ error_log1 ( "All buffers are pinned. %s\n", "Unable to grant buffer request" ); return(NULL); } } /* BHP can be used */ if ( bhp->flags & BUF_DIRTY ) { do_statistics = 1; /* MIS Check for log flushed appropriately */ fd = bf_fid_to_fd(bhp->id.file_id); if ( fd == -1 ) { error_log1 ("Invalid fid %d\n", bhp->id.file_id); return(NULL); } if ( bf_put_page(fd, bhp) < 0 ) { return(NULL); } } /* Update Hash Pointers */ ndx = BUF_HASH ( bhp->id.file_id, bhp->id.obj_id ); LISTP_REMOVE(bufhdr_table, hash, bhp); if ( buf_hash_table[ndx] == (bhp-bufhdr_table) ) { if ( bhp->hash.next != (bhp-bufhdr_table) ) { buf_hash_table[ndx] = bhp->hash.next; } else { buf_hash_table[ndx] = NUM_BUFS; } } INIT_BUF(bhp); return(bhp); } /* buf_alloc Add a page to a file and return a buffer for it. */ ADDR_T buf_alloc ( fid, new_pageno ) int fid; int *new_pageno; { BUFHDR_T *bhp; int fd; int len; int ndx; OBJ_T fobj; if (GET_MASTER) { return(NULL); } if ( buf_fids[fid].npages == -1 ) { /* initialize npages field */ fd = bf_fid_to_fd ( fid ); } assert (fid < NUM_FILE_ENTRIES); *new_pageno = buf_fids[fid].npages; if ( *new_pageno == -1 ) { RELEASE_MASTER; return ( NULL ); } buf_fids[fid].npages++; ndx = BUF_HASH ( fid, *new_pageno ); fobj.file_id = fid; fobj.obj_id = *new_pageno; bhp = bf_assign_buf ( ndx, &fobj, BF_PIN|BF_DIRTY|BF_EMPTY, &len ); if ( RELEASE_MASTER ) { /* Memory leak */ return(NULL); } if ( bhp ) { return ((ADDR_T)(buf_table+(bhp-bufhdr_table))); } else { return ( NULL ); } } /* Buffer Flags BF_DIRTY Mark page as dirty BF_EMPTY Don't initialize page, just get buffer BF_PIN Retrieve with pin MIS Might want to add a flag that sets an LSN for this buffer is the DIRTY flag is set Eventually, you may want a flag that indicates the I/O and lock request should be shipped off together, but not for now. */ extern ADDR_T buf_get ( file_id, page_id, flags, len ) int file_id; int page_id; u_long flags; int *len; /* Number of bytes read into buffer */ { BUFHDR_T *bhp; int bufid; int fd; int ndx; int next_bufid; int stat; OBJ_T fobj; ndx = BUF_HASH ( file_id, page_id ); fobj.file_id = (long) file_id; fobj.obj_id = (long) page_id; if ( GET_MASTER ) { return(NULL); } /* This could be a for loop, but we lose speed by making all the cases general purpose so we optimize for the no-collision case. */ bufid = buf_hash_table[ndx]; if ( bufid < NUM_BUFS ) { for ( bhp = bufhdr_table+bufid; !OBJ_EQ (bhp->id, fobj) || !(bhp->flags & BUF_VALID); bhp = LISTP_NEXTP ( bufhdr_table, hash, bhp ) ) { if ( bhp->hash.next == bufid ) { goto not_found; } } /* found */ if ( flags & BF_PIN ) { bhp->flags |= BUF_PINNED; bhp->refcount++; #ifdef PIN_DEBUG fprintf(stderr, "buf_get: %X PINNED (%d)\n", buf_table + (bhp-bufhdr_table), bhp->refcount); #endif } if ( flags & BF_DIRTY ) { bhp->flags |= BUF_DIRTY; } while ( bhp->flags & BUF_IO_IN_PROGRESS ) { /* MIS -- eventually err check here */ #ifdef DEBUG printf("About to sleep on %d (me: %d\n)\n", bhp->wait_proc, my_txnp - txn_table); #endif #ifdef WAIT_STATS buf_waits++; #endif stat = proc_sleep_on ( &(bhp->wait_proc), buf_spinlock ); if ( stat ) { /* Memory leak */ return(NULL); } if (!( bhp->flags & BUF_IO_IN_PROGRESS) && (!OBJ_EQ (bhp->id, fobj) || !(bhp->flags & BUF_VALID))) { if (RELEASE_MASTER) return(NULL); return(buf_get ( file_id, page_id, flags, len )); } } MAKE_MRU( bhp ); *len = BUFSIZE; } else { not_found: /* If you get here, the page isn't in the hash table */ bhp = bf_assign_buf ( ndx, &fobj, flags, len ); } /* Common code between found and not found */ if ( bhp && bhp->flags & BUF_NEWPAGE ) { *len = 0; } if (RELEASE_MASTER){ /* Memory leak */ return(NULL); } if ( bhp ) { return ((ADDR_T)(buf_table+(bhp-bufhdr_table))); } else { return ( NULL ); } } /* MIS - do I want to add file links to buffer pool? */ extern int buf_sync ( fid, close ) int fid; int close; /* should we dec refcount and possibly invalidate all the buffers */ { int i; int fd; int invalidate; BUFHDR_T *bhp; if ( (fd = bf_fid_to_fd ( fid )) < 0 ) { return(1); } if (GET_MASTER) { return(1); } invalidate = (buf_fids[fid].refcount == 1 && close); if ( invalidate ) for ( bhp = bufhdr_table, i = 0; i < NUM_BUFS; bhp++, i++ ) { if (bhp->id.file_id == fid) { if ((bhp->flags & BF_DIRTY) && (bf_put_page( fd, bhp ) < 0)) { return(1); } bhp->id.file_id = -1; } } if (invalidate || close) buf_fids[fid].refcount--; if (RELEASE_MASTER) { return(1); } return(0); } extern int buf_flags ( addr, set_flags, unset_flags ) ADDR_T addr; u_long set_flags; u_long unset_flags; { int bufid; BUFHDR_T *bhp; #ifdef PIN_DEBUG fprintf(stderr, "buf_flags: %X setting %s%s%s%s%s releasing %s%s%s%s%s\n", addr, set_flags&BUF_DIRTY ? "DIRTY " : "", set_flags&BUF_VALID ? "VALID " : "", set_flags&BUF_PINNED ? "PINNED " : "", set_flags&BUF_IO_ERROR ? "IO_ERROR " : "", set_flags&BUF_IO_IN_PROGRESS ? "IO_IN_PROG " : "", set_flags&BUF_NEWPAGE ? "NEWPAGE " : "", unset_flags&BUF_DIRTY ? "DIRTY " : "", unset_flags&BUF_VALID ? "VALID " : "", unset_flags&BUF_PINNED ? "PINNED " : "", unset_flags&BUF_IO_ERROR ? "IO_ERROR " : "", unset_flags&BUF_IO_IN_PROGRESS ? "IO_IN_PROG " : "", unset_flags&BUF_NEWPAGE ? "NEWPAGE " : "" ); #endif if (!ADDR_OK(addr)) { error_log1 ( "buf_pin: Invalid Buffer Address %x\n", addr ); return(1); } bufid = ((BUF_T *)addr) - buf_table; assert ( bufid < NUM_BUFS); bhp = &bufhdr_table[bufid]; if (GET_MASTER) { return(1); } bhp->flags |= set_flags; if ( set_flags & BUF_PINNED ) { bhp->refcount++; } if ( set_flags & BUF_DIRTY ) { unset_flags |= BUF_NEWPAGE; } if ( unset_flags & BUF_PINNED ) { bhp->refcount--; if ( bhp->refcount ) { /* Turn off pin bit so it doesn't get unset */ unset_flags &= ~BUF_PINNED; } } bhp->flags &= ~unset_flags; MAKE_MRU(bhp); if (RELEASE_MASTER) { return(1); } return(0); } /* Take a string name and produce an fid. returns -1 on error MIS -- this is a potential problem -- you keep actual names here -- what if people run from different directories? */ extern int buf_name_lookup ( fname ) char *fname; { int i; int fid; int ndx; fid = -1; if (GET_MASTER) { return(-1); } for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) { if ( buf_fids[i].offset == -1 ) { fid = i; } else { if (!strcmp (fname, buf_strings+buf_fids[i].offset)) { if (RELEASE_MASTER) { return(-1); } buf_fids[i].refcount++; return(i); } } } if ( fid == -1 ) { error_log0 ( "No more file ID's\n" ); } else { ndx = *buf_sp - strlen(fname) - 1; if ( ndx < 0 ) { error_log0 ( "Out of string space\n" ); fid = -1; } else { *buf_sp = ndx; strcpy ( buf_strings+ndx, fname ); buf_fids[fid].offset = ndx; } buf_fids[fid].refcount = 1; } if (RELEASE_MASTER) { return(-1); } return(fid); } static int bf_fid_to_fd ( fid ) int fid; { struct stat sbuf; assert ( (fid < NUM_FILE_ENTRIES) && (buf_fids[fid].offset != -1) ); if ( fds[fid] != -1 ) { return(fds[fid]); } fds[fid] = open ( buf_strings+buf_fids[fid].offset, O_RDWR|O_CREAT, 0666 ); if ( fds[fid] < 0 ) { error_log3 ( "Error Opening File %s FID: %d FD: %d. Errno = %d\n", buf_strings+buf_fids[fid].offset, fid, fds[fid], errno ); return(-1); } error_log3 ( "Opening File %s FID: %d FD: %d\n", buf_strings+buf_fids[fid].offset, fid, fds[fid] ); if ( buf_fids[fid].npages == -1 ) { /* Initialize the npages field */ if ( fstat ( fds[fid], &sbuf ) ) { error_log3 ( "Error Fstating %s FID: %d. Errno = %d\n", buf_strings+buf_fids[fid].offset, fid, errno ); } else { buf_fids[fid].npages = ( sbuf.st_size / BUFSIZE ); } } return ( fds[fid] ); } static int bf_put_page ( fd, bhp ) int fd; BUFHDR_T *bhp; { int nbytes; assert ( (bhp-bufhdr_table) < NUM_BUFS ); if ( lseek ( fd, bhp->id.obj_id << BUFSHIFT, L_SET ) < 0 ) { return(-1); } bhp->flags |= BUF_IO_IN_PROGRESS; if (RELEASE_MASTER) { return(-1); } nbytes = write(fd, buf_table[bhp-bufhdr_table], BUFSIZE); if (GET_MASTER) { return(-2); } if ( nbytes < 0 ) { error_log1 ("Write failed with error code %d\n", errno); return(-1); } else if ( nbytes != BUFSIZE ) { error_log1 ("Short write: %d bytes of %d\n", nbytes, BUFSIZE ); } bhp->flags &= ~(BUF_DIRTY|BUF_IO_IN_PROGRESS); return (0); } static BUFHDR_T * bf_assign_buf ( ndx, obj, flags, len ) int ndx; OBJ_T *obj; u_long flags; int *len; /* Number of bytes read */ { BUFHDR_T *bhp; int fd; assert ( obj->file_id < NUM_FILE_ENTRIES ); bhp = bf_newbuf(); if ( !bhp ) { return(NULL); } OBJ_ASSIGN ( (*obj), bhp->id ); if ( buf_hash_table[ndx] >= NUM_BUFS ) { buf_hash_table[ndx] = bhp-bufhdr_table; } else { LISTPE_INSERT ( bufhdr_table, hash, bhp, buf_hash_table[ndx] ); } bhp->flags |= BUF_VALID; if ( flags & BF_PIN ) { bhp->flags |= BUF_PINNED; bhp->refcount++; #ifdef PIN_DEBUG fprintf(stderr, "bf_assign_buf: %X PINNED (%d)\n", buf_table + (bhp-bufhdr_table), bhp->refcount); #endif } fd = bf_fid_to_fd(obj->file_id); if ( fd == -1 ) { error_log1 ("Invalid fid %d\n", obj->file_id); bhp->flags |= ~BUF_IO_ERROR; return(NULL); } if ( obj->obj_id >= buf_fids[obj->file_id].npages) { buf_fids[obj->file_id].npages = obj->obj_id+1; *len = 0; } else if ( flags & BF_EMPTY ) { *len = 0; } else { bhp->flags |= BUF_IO_IN_PROGRESS; if (RELEASE_MASTER) { return(NULL); } if ( lseek ( fd, obj->obj_id << BUFSHIFT, L_SET ) < -1 ) { error_log2 ("Unable to perform seek on file: %d to page %d", obj->file_id, obj->obj_id ); bhp->flags &= ~BUF_IO_IN_PROGRESS; bhp->flags |= ~BUF_IO_ERROR; return(NULL); } *len = read(fd, buf_table[bhp-bufhdr_table], BUFSIZE); if ( *len < 0 ) { error_log2 ("Unable to perform read on file: %d to page %d", obj->file_id, obj->obj_id ); bhp->flags &= ~BUF_IO_IN_PROGRESS; bhp->flags |= ~BUF_IO_ERROR; return(NULL); } if (GET_MASTER) { return(NULL); } bhp->flags &= ~BUF_IO_IN_PROGRESS; if ( bhp->wait_proc != -1 ) { /* wake up waiter and anyone waiting on it */ #ifdef DEBUG printf("Waking transaction %d due to completed I/O\n", bhp->wait_proc); #endif proc_wake_id ( bhp->wait_proc ); bhp->wait_proc = -1; } MAKE_MRU(bhp); } if ( flags & BF_DIRTY ) { bhp->flags |= BUF_DIRTY; } else if ( *len < BUFSIZE ) { bhp->flags |= BUF_NEWPAGE; } return ( bhp ); } int buf_last ( fid ) int fid; { int val; if (GET_MASTER) { return(-1); } assert ( fid < NUM_FILE_ENTRIES ); if ( buf_fids[fid].npages == -1 ) { /* initialize npages field */ (void) bf_fid_to_fd ( fid ); } val = buf_fids[fid].npages; if ( val ) { val--; /* Convert to page number */ } if (RELEASE_MASTER) { return(-1); } return(val); } #ifdef DEBUG extern void buf_dump ( id, all ) int id; int all; { int i; BUFHDR_T *bhp; printf ( "LRU + %d\n", *buf_lru ); if ( all ) { printf("ID\tFID\tPID\tLNEXT\tLPREV\tHNEXT\tHPREV\tSLEEP\tFLAG\tREFS\n"); for ( bhp = bufhdr_table, i = 0; i < NUM_BUFS; bhp++, i++ ) { printf ( "%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%x\t%d\n", i, bhp->id.file_id, bhp->id.obj_id, bhp->lru.next, bhp->lru.prev, bhp->hash.next, bhp->hash.prev, bhp->wait_proc, bhp->flags, bhp->refcount ); } } else { if ( id >= NUM_BUFS ) { printf ( "Buffer ID (%d) too high\n", id ); return; } bhp = bufhdr_table+id; printf ( "%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%x\t%d\n", i, bhp->id.file_id, bhp->id.obj_id, bhp->lru.next, bhp->lru.prev, bhp->hash.next, bhp->hash.prev, bhp->wait_proc, bhp->flags, bhp->refcount ); } return; } #endif