C/C++教程

PostgreSQL数据库共享内存——开拓者CreateSharedMemoryAndSemaphores函数

本文主要是介绍PostgreSQL数据库共享内存——开拓者CreateSharedMemoryAndSemaphores函数,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

函数CreateSharedMemoryAndSemaphores负责对共享内存和信号量进行初始化,由Postmaster或其子进程调用。Postmaster调用该函数时会初始化共享内存和信号量,其他进程(如Postgres)调用时不进行初始化工作,仅仅获得已创建的共享变量指针和信号量指针。在初始化过程中,系统为共享内存创建了一个名为shmem index的Hash索引。当试图为一个模块分配共享内存时,会调用函数ShmemInitStruct(该函数首先根据模块名在Hash索引中查找,如果找不到则再调用ShmemAlloc函数在内存中为其分配一块区域)。函数CreateSharedMemoryAndSemaphores初始化流程如下:

  • 计算共享内存总共需要的大小
  • 分配共享内存空间和创建信号量
  • 初始化共享内存头指针
  • 注册清理函数
  • 构建共享内存Hash索引
  • 初始化各个模块,调用ShmemInitStruct函数从已分配的共享内存分配空间

计算共享内存总共需要的大小

共享内存使用项计算共享内存大小函数数量大小
基本100000
InitProcGlobal信号量ProcGlobalSemasMaxBackends + NUM_AUXILIARY_PROCS如果定义了USE_NAMED_POSIX_SEMAPHORES,返回0;否则(MaxBackends + 4)*sizeof(PGSemaphoreData)
spinlocks信号量SpinlockSemasNUM_EMULATION_SEMAPHORES(128 + 64)*sizeof(PGSemaphore)
哈希表hash_estimate_size(SHMEM_INDEX_SIZE,sizeof(ShmemIndexEnt))见注1+2+3+4见表下注释
buffer poolBufferShmemSizeNBuffers*sizeof(BufferDescPadded) + PG_CACHE_LINE_SIZE + NBuffers*BLCKSZ + StrategyShmemSize() + NBuffers*sizeof(LWLockMinimallyPadded) + NBuffers*sizeof(CkptSortItem)NBuffers*sizeof(BufferDescPadded) + 128 + NBuffers*BLCKSZ + BufTableShmemSize(NBuffers + NUM_BUFFER_PARTITIONS) + MAXALIGN(sizeof(BufferStrategyControl)) + NBuffers*sizeof(LWLockMinimallyPadded) + NBuffers*sizeof(CkptSortItem)
lock tablesLockShmemSizehash_estimate_size(max_table_size, sizeof(LOCK)) + hash_estimate_size(max_table_size*2, sizeof(PROCLOCK)) + size/10
predicate lock tablePredicateLockShmemSize
InitProcGlobalProcGlobalShmemSize见注5-12
XLOG shared memoryXLOGShmemSizesizeof(XLogCtlData) + sizeof(WALInsertLockPadded)*(NUM_XLOGINSERT_LOCKS + 1) + sizeof(XLogRecPtr)*XLOGbuffers + XLOG_BLCKSZ + XLOG_BLCKSZ*XLOGbuffers
CLOG shared memoryCLOGShmemSizeSimpleLruShmemSize(CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE)
CommitTs shared memoryCommitTsShmemSizeSimpleLruShmemSize(CommitTsShmemBuffers(), 0) + sizeof(CommitTimestampShared)
SUBTRANS shared memorySUBTRANSShmemSizeSimpleLruShmemSize(NUM_SUBTRANS_BUFFERS, 0)
TwoPhase shared memoryTwoPhaseShmemSizeoffsetof(TwoPhaseStateData, prepXacts) + max_prepared_xacts*sizeof(GlobalTransaction) + MAXALIGN(size) + max_prepared_xacts*sizeof(GlobalTransactionData)
BGworker shared memoryBackgroundWorkerShmemSizeoffsetof(BackgroundWorkerArray, slot) + max_worker_processes*sizeof(BackgroundWorkerSlot)
MultiXact shared memoryMultiXactShmemSizeSHARED_MULTIXACT_STATE_SIZE+SimpleLruShmemSize(NUM_MXACTOFFSET_BUFFERS, 0)+SimpleLruShmemSize(NUM_MXACTMEMBER_BUFFERS, 0)offsetof(MultiXactStateData, perBackendXactIds)+sizeof(MultiXactId)+sizeof(MultiXactId) * 2*MaxOldestSlot + SimpleLruShmemSize(NUM_MXACTOFFSET_BUFFERS, 0)+SimpleLruShmemSize(NUM_MXACTMEMBER_BUFFERS, 0)
LWLocks and named tranchesLWLockShmemSize(NUM_FIXED_LWLOCKS+NumLWLocksByNamedTranches())*sizeof(LWLockPadded)+sizeof(int) + LWLOCK_PADDED_SIZE+NamedLWLockTrancheRequests*sizeof(NamedLWLockTranche)+NamedLWLockTrancheRequests* (strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1)
CreateSharedProcArrayProcArrayShmemSizeoffsetof(ProcArrayStruct, pgprocnos)+sizeof(int)*PROCARRAY_MAXPROCS+(sizeof(TransactionId) +sizeof(bool))*TOTAL_MAX_CACHED_SUBXIDSoffsetof(ProcArrayStruct, pgprocnos)+sizeof(int)*(MaxBackends + max_prepared_xacts)+(sizeof(TransactionId) +sizeof(bool))* ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
CreateSharedBackendStatusBackendStatusShmemSizesizeof(PgBackendStatus)*NumBackendStatSlots+NAMEDATALEN*NumBackendStatSlots*2+pgstat_track_activity_query_size*NumBackendStatSlots+sizeof(PgBackendSSLStatus)*NumBackendStatSlots*2
SInvalSInvalShmemSizeoffsetof(SISeg, procState)+sizeof(ProcState)*MaxBackends
pmsignalPMSignalShmemSizeoffsetof(PMSignalData, PMChildFlags)+MaxLivePostmasterChildren()*sizeof(sig_atomic_t)
procsignalProcSignalShmemSizeNumProcSignalSlots * sizeof(ProcSignalSlot)
CheckpointerCheckpointerShmemSizeoffsetof(CheckpointerShmemStruct, requests)+NBuffers*sizeof(CheckpointerRequest)
autovacuumAutoVacuumShmemSizeMAXALIGN(sizeof(AutoVacuumShmemStruct))+autovacuum_max_workers*sizeof(WorkerInfoData)
ReplicationReplicationSlotsShmemSizeoffsetof(ReplicationSlotCtlData, replication_slots)+max_replication_slots*sizeof(ReplicationSlot)
ReplicationOriginShmemSizeoffsetof(ReplicationStateCtl, states)+max_replication_slots*sizeof(ReplicationState)
WalSndWalSndShmemSizeoffsetof(WalSndCtlData, walsnds)+max_wal_senders*sizeof(WalSnd)
WalRcvWalRcvShmemSizesizeof(WalRcvData)
replication launcherApplyLauncherShmemSizeMAXALIGN(sizeof(LogicalRepCtxStruct)) +max_logical_replication_workers*sizeof(LogicalRepWorker)
SnapSnapMgrShmemSizeoffsetof(OldSnapshotControlData, xid_by_minute)+sizeof(TransactionId)*OLD_SNAPSHOT_TIME_MAP_ENTRIES)
BTreeBTreeShmemSizeoffsetof(BTVacInfo, vacuums)+MaxBackends* sizeof(BTOneVacInfo)
SyncScanSyncScanShmemSizeSizeOfScanLocations(SYNC_SCAN_NELEM)
AsyncAsyncShmemSize(MaxBackends + 1)* sizeof(QueueBackendStatus)+offsetof(AsyncQueueControl, backend)+SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0)
BackendShmemBackendArraySizeMaxLivePostmasterChildren()*sizeof(Backend)
total_addin_request
round offsize + 8192 - (size % 8192)

注:

  1. MAXALIGN(sizeof(HASHHDR))
  2. nDirEntries*sizeof(HASHSEGMENT)
  3. nSegments*MAXALIGN(DEF_SEGSIZE * sizeof(HASHBUCKET))–>next_pow2_long((nBuckets - 1)/256+1)*MAXALIGN(DEF_SEGSIZE * sizeof(HASHBUCKET))–>next_pow2_long((next_pow2_long((SHMEM_INDEX_SIZE - 1)+1) - 1)/256+1)*MAXALIGN(DEF_SEGSIZE * sizeof(HASHBUCKET))
  4. nElementAllocs*elementAllocCnt*elementSize–>((num_entries - 1) / elementAllocCnt + 1)*choose_nelem_alloc(entrysize)*(MAXALIGN(sizeof(HASHELEMENT)) + MAXALIGN(entrysize))–>((SHMEM_INDEX_SIZE - 1) / choose_nelem_alloc(sizeof(ShmemIndexEnt)) + 1)*choose_nelem_alloc(sizeof(ShmemIndexEnt))*(MAXALIGN(sizeof(HASHELEMENT)) + MAXALIGN(sizeof(ShmemIndexEnt)))
  5. hash_estimate_size(max_table_size,sizeof(PREDICATELOCKTARGET)–>hash_estimate_size(NPREDICATELOCKTARGETENTS(), sizeof(PREDICATELOCKTARGET)
  6. hash_estimate_size(max_table_size*2,sizeof(PREDICATELOCK))
  7. size+size / 10
  8. PredXactListDataSize+((MaxBackends + max_prepared_xacts)*10)*PredXactListElementDataSize
  9. hash_estimate_size((MaxBackends + max_prepared_xacts)*10,sizeof(SERIALIZABLEXID))
  10. sizeof(SHM_QUEUE)
  11. sizeof(OldSerXidControlData)
  12. SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0)

分配共享内存空间和创建信号量

分配共享内存空间

// Create the shmem segment
PGShmemHeader *seghdr;
PGShmemHeader *shim = NULL;
seghdr = PGSharedMemoryCreate(size, port, &shim);
InitShmemAccess(seghdr);

PGSharedMemoryCreate函数处于sysv_shmem.c文件中,创建共享内存段,并初始化其标准头,最后注册on_shmem_exit回调函数以释放存储。

PGShmemHeader *PGSharedMemoryCreate(Size size, int port, PGShmemHeader **shim)
{
	IpcMemoryKey NextShmemSegID;
	void	   *memAddress;
	PGShmemHeader *hdr;
	struct stat statbuf;
	Size		sysvsize;
	/* Complain if hugepages demanded but we can't possibly support them */
#if !defined(MAP_HUGETLB)
	if (huge_pages == HUGE_PAGES_ON)
		ereport(ERROR,(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),errmsg("huge pages not supported on this platform")));
#endif
	/* Room for a header? */
	Assert(size > MAXALIGN(sizeof(PGShmemHeader)));
	if (shared_memory_type == SHMEM_TYPE_MMAP){
		AnonymousShmem = CreateAnonymousSegment(&size);
		AnonymousShmemSize = size;
		/* Register on-exit routine to unmap the anonymous segment */
		on_shmem_exit(AnonymousShmemDetach, (Datum) 0);
		/* Now we need only allocate a minimal-sized SysV shmem block. */
		sysvsize = sizeof(PGShmemHeader);
	} else
		sysvsize = size;

	/* Loop till we find a free IPC key.  Trust CreateDataDirLockFile() to ensure no more than one postmaster per data directory can enter this loop simultaneously.  (CreateDataDirLockFile() does not ensure that, but prefer fixing it over coping here.) */
	NextShmemSegID = 1 + port * 1000;
	for (;;){
		IpcMemoryId shmid;
		PGShmemHeader *oldhdr;
		IpcMemoryState state;
		/* Try to create new segment */
		memAddress = InternalIpcMemoryCreate(NextShmemSegID, sysvsize);
		if (memAddress)
			break;				/* successful create and attach */
		/* Check shared memory and possibly remove and recreate */
		/* shmget() failure is typically EACCES, hence SHMSTATE_FOREIGN. ENOENT, a narrow possibility, implies SHMSTATE_ENOENT, but one can safely treat SHMSTATE_ENOENT like SHMSTATE_FOREIGN. */
		shmid = shmget(NextShmemSegID, sizeof(PGShmemHeader), 0);
		if (shmid < 0){
			oldhdr = NULL;
			state = SHMSTATE_FOREIGN;
		}else
			state = PGSharedMemoryAttach(shmid, NULL, &oldhdr);
		switch (state){
			case SHMSTATE_ANALYSIS_FAILURE:
			case SHMSTATE_ATTACHED:
				ereport(FATAL,(errcode(ERRCODE_LOCK_FILE_EXISTS),errmsg("pre-existing shared memory block (key %lu, ID %lu) is still in use",(unsigned long) NextShmemSegID,(unsigned long) shmid),errhint("Terminate any old server processes associated with data directory \"%s\".",DataDir)));
				break;
			case SHMSTATE_ENOENT:
				/* To our surprise, some other process deleted since our last InternalIpcMemoryCreate().  Moments earlier, we would have seen SHMSTATE_FOREIGN.  Try that same ID again. */
				elog(LOG,"shared memory block (key %lu, ID %lu) deleted during startup",(unsigned long) NextShmemSegID,(unsigned long) shmid);
				break;
			case SHMSTATE_FOREIGN:
				NextShmemSegID++;
				break;
			case SHMSTATE_UNATTACHED:

				/*
				 * The segment pertains to DataDir, and every process that had
				 * used it has died or detached.  Zap it, if possible, and any
				 * associated dynamic shared memory segments, as well.  This
				 * shouldn't fail, but if it does, assume the segment belongs
				 * to someone else after all, and try the next candidate.
				 * Otherwise, try again to create the segment.  That may fail
				 * if some other process creates the same shmem key before we
				 * do, in which case we'll try the next key.
				 */
				if (oldhdr->dsm_control != 0)
					dsm_cleanup_using_control_segment(oldhdr->dsm_control);
				if (shmctl(shmid, IPC_RMID, NULL) < 0)
					NextShmemSegID++;
				break;
		}
		if (oldhdr && shmdt(oldhdr) < 0)
			elog(LOG, "shmdt(%p) failed: %m", oldhdr);
	}
	/* Initialize new segment. */
	hdr = (PGShmemHeader *) memAddress;
	hdr->creatorPID = getpid();
	hdr->magic = PGShmemMagic;
	hdr->dsm_control = 0;
	/* Fill in the data directory ID info, too */
	if (stat(DataDir, &statbuf) < 0)
		ereport(FATAL,(errcode_for_file_access(),errmsg("could not stat data directory \"%s\": %m",DataDir)));
	hdr->device = statbuf.st_dev;
	hdr->inode = statbuf.st_ino;
	/* Initialize space allocation status for segment. */
	hdr->totalsize = size;
	hdr->freeoffset = MAXALIGN(sizeof(PGShmemHeader));
	*shim = hdr;
	/* Save info for possible future use */
	UsedShmemSegAddr = memAddress;
	UsedShmemSegID = (unsigned long) NextShmemSegID;

	/*
	 * If AnonymousShmem is NULL here, then we're not using anonymous shared
	 * memory, and should return a pointer to the System V shared memory
	 * block. Otherwise, the System V shared memory block is only a shim, and
	 * we must return a pointer to the real block.
	 */
	if (AnonymousShmem == NULL)
		return hdr;
	memcpy(AnonymousShmem, hdr, sizeof(PGShmemHeader));
	return (PGShmemHeader *) AnonymousShmem;
}

CreateAnonymousSegment函数创建匿名mmap共享内存段

static void *
CreateAnonymousSegment(Size *size)
{
	Size		allocsize = *size;
	void	   *ptr = MAP_FAILED;
	int			mmap_errno = 0;

#ifndef MAP_HUGETLB
	/* PGSharedMemoryCreate should have dealt with this case */
	Assert(huge_pages != HUGE_PAGES_ON);
#else
	if (huge_pages == HUGE_PAGES_ON || huge_pages == HUGE_PAGES_TRY)
	{
		/*
		 * Round up the request size to a suitable large value.
		 */
		Size		hugepagesize;
		int			mmap_flags;

		GetHugePageSize(&hugepagesize, &mmap_flags);

		if (allocsize % hugepagesize != 0)
			allocsize += hugepagesize - (allocsize % hugepagesize);

		ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE,
				   PG_MMAP_FLAGS | mmap_flags, -1, 0);
		mmap_errno = errno;
		if (huge_pages == HUGE_PAGES_TRY && ptr == MAP_FAILED)
			elog(DEBUG1, "mmap(%zu) with MAP_HUGETLB failed, huge pages disabled: %m",
				 allocsize);
	}
#endif

	if (ptr == MAP_FAILED && huge_pages != HUGE_PAGES_ON)
	{
		/*
		 * Use the original size, not the rounded-up value, when falling back
		 * to non-huge pages.
		 */
		allocsize = *size;
		ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE,
				   PG_MMAP_FLAGS, -1, 0);
		mmap_errno = errno;
	}

	if (ptr == MAP_FAILED)
	{
		errno = mmap_errno;
		ereport(FATAL,
				(errmsg("could not map anonymous shared memory: %m"),
				 (mmap_errno == ENOMEM) ?
				 errhint("This error usually means that PostgreSQL's request "
						 "for a shared memory segment exceeded available memory, "
						 "swap space, or huge pages. To reduce the request size "
						 "(currently %zu bytes), reduce PostgreSQL's shared "
						 "memory usage, perhaps by reducing shared_buffers or "
						 "max_connections.",
						 *size) : 0));
	}

	*size = allocsize;
	return ptr;
}

static void
AnonymousShmemDetach(int status, Datum arg)
{
	/* Release anonymous shared memory block, if any. */
	if (AnonymousShmem != NULL)
	{
		if (munmap(AnonymousShmem, AnonymousShmemSize) < 0)
			elog(LOG, "munmap(%p, %zu) failed: %m",
				 AnonymousShmem, AnonymousShmemSize);
		AnonymousShmem = NULL;
	}
}

创建信号量

// Create semaphores
PGReserveSemaphores(numSemas, port);

构建共享内存Hash索引

在这里插入图片描述

https://blog.csdn.net/BeiiGang/article/details/7288763

这篇关于PostgreSQL数据库共享内存——开拓者CreateSharedMemoryAndSemaphores函数的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!