1 创建序列

每台主机每个表都需要单独创建序列

  • 创建序列的目的是用于快速生成snowflakeId.每次将新生成的snowflakeId保存在序列中,然后用于计算新的snowflakeId.
  • 序列的命名规则:seq_“机器编号”_“表名”_id
drop sequence if exists seq_1_table_id;
create sequence if not exists seq_1_table_id
as bigint
	increment 1
    minvalue 0	
	maxvalue 9223372036854775807
	start 0
	cache 1;
--重置序列
select setval('seq_1_table_id'::regclass,0);

2 创建生成snowflakeId函数

  • generateId依赖序列
  • generateId是公共函数,第一个参数为表的序列,第二个参数为机器编号(值范围为0-1023)
  • 注意第一个参数要和第二个参数相匹配
  • 例如机器号为2的主机, 序列名应该为"seq_2_table_id",机器id为2

2.1 创建生成snowflakeId函数(uninx时间戳)

  • 因为使用的是uninx时间戳,最多可以使用至2039-09-07 23:47:35.551+08
drop function if exists generateId(regclass,int);
create or replace function generateId(regclass,imachineId int)
    returns bigint
as $$
    --4095      =0x0000000000000FFF
    --4190208   =0x00000000003FF000
	select 
        (case when (sn>>22)=milliseconds and ((sn&4190208)>>12)=$2 then	
            setval($1,(milliseconds<<22)|($2<<12)|((sn&4095)))    --sn nextval时已经加1
        else
            setval($1,(milliseconds<<22)|($2<<12)|1)
        end)
    from (select nextval($1) as sn,(extract(epoch from clock_timestamp())*1000)::bigint as milliseconds) as tmp;
$$ language sql;

2.2 创建生成snowflakeId函数(时间偏移从2021-01-01开始)

  • 时间偏移从2021-01-01开始,最多可以使用至2090-09-07 15:47:35.551+08
drop function if exists generateIdOffset(regclass,int);
create or replace function generateIdOffset(regclass,imachineId int)
    returns bigint
as $$
	--1609430400000 =2021-01-01 00:00:00.000000
    --4095          =0x0000000000000FFF
    --4190208       =0x00000000003FF000
	select 
        (case when (sn>>22)=milliseconds and ((sn&4190208)>>12)=$2 then	
            setval($1,(milliseconds<<22)|($2<<12)|((sn&4095)))    --sn nextval时已经加1
        else
            setval($1,(milliseconds<<22)|($2<<12)|1)
        end)
    from (select nextval($1) as sn,(extract(epoch from clock_timestamp())*1000)::bigint-1609430400000  as milliseconds) as tmp;
$$ language sql;

3 测试

snowflakeId生成速度约90个/毫秒

3.1 uninx时间戳

select 
	id,id>>22,(id&4190208)>>12,id&4095
from(
    select generateId('public.seq_1_table_id'::regclass,1) as id from generate_series(1,10000)
) as tmp;

3.2 时间偏移从2021-01-01开始

select 
	id,1609430400000 +(id>>22),(id&4190208)>>12,id&4095
from(
    select  generateIdOffset('public.seq_1_table_id'::regclass,1) as id from generate_series(1,10000)
) as tmp;

4 C语言结合PG Hasmap实现

snowflakeId生成速度约700个/毫秒/每核,是使用序列的10倍.
以8核心cpu为生成snowflakeId为例: 500* 1000 * 8 = 4000000个/秒,取500个/毫秒/每核的中间值排除锁\cpu切换时间.

/**
*		根据表OID和机器编号生成新的ID
*		生成速度约为每核心每毫秒700个id
*
*		iRelId:表注册的id, 示例:'public.paramtest'::regclass::oid
*		iMachineId:机器编号, 值范围从1 - select (1<<iMachineBinLen)-1
*		iMachineBinLen:机器编号使用的字节数, iMachineBinLen+iSnBinLen必须小于等于22
*		iSnBinLen:序列号使用的字节数, iMachineBinLen+iSnBinLen必须小于等于22
*		imillisecondOffset:从1970-01-01偏移多少毫秒,只能是正数,计算时要加上此偏移量
*
*		drop function if exists generate_snowflake_v3(Oid,integer,integer,integer,bigint);
*		drop function if exists generateSnowflakeV3(Oid,integer,integer,integer,bigint);
*/
create function generateSnowflakeV3(
	iRelId Oid,
	iMachineId integer,
	iMachineBinLen integer default 10,
	iSnBinLen integer default 12,
	iMillisecondOffset bigint default 1577836800000
)	returns bigint 
as 'pg_kmcb', 'generateSnowflakeV3'
language C strict;
Datum generateSnowflakeV3(PG_FUNCTION_ARGS) {
	bool	 found;
	Oid relid;
	int64 time, millisecondOffset;
	struct SeHashKey searchKey;
	struct SeHashEntry* entry = NULL;
	struct timeval tp;
	int32 machineId, timeOffset, machineBinLen, snBinLen, sn;



	if (NULL == pgState || NULL == pgHash)
		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
			errmsg("pg_kmb must be loaded via shared_preload_libraries")));

	relid = PG_GETARG_OID(0);
	machineId = PG_GETARG_INT32(1);
	machineBinLen = PG_GETARG_INT32(2);
	snBinLen = PG_GETARG_INT32(3);
	millisecondOffset = PG_GETARG_INT64(4);

	if (machineId < 1)
		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
			errmsg("The parameter iMachineId must be greater than 1.")));
	if (machineBinLen < 1)
		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
			errmsg("The parameter iMachineBinLen must be greater than 1.")));
	if (snBinLen < 1)
		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
			errmsg("The parameter iSnBinLen must be greater than 1.")));
	if (millisecondOffset < 0)
		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
			errmsg("The parameter iMillisecondOffset must be greater than 0.")));

	if (machineId > ((1 << machineBinLen) - 1))
		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
			errmsg("The parameter iMachineId out of range.")));
	timeOffset = (machineBinLen + snBinLen);

	if (timeOffset != 22)
		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
			errmsg("iMachineBinLen and  iSnBinLen out of range(iMachineBinLen + iSnBinLen=22).")));


	//获取时间
	gettimeofday(&tp, NULL);
	time = (
		(((int64)tp.tv_sec) * INT64CONST(1000))
		+ (((int64)tp.tv_usec) / INT64CONST(1000))
		) - millisecondOffset;

	//hash操作
	searchKey.relid = relid;
	searchKey.machineId = machineId;
	LWLockAcquire(pgState->lock, LW_SHARED);
	entry = (struct SeHashEntry*)hash_search(pgHash, &searchKey, HASH_FIND, NULL);
	if (NULL == entry) {
		LWLockRelease(pgState->lock);
		LWLockAcquire(pgState->lock, LW_EXCLUSIVE);
		entry = (struct SeHashEntry*)hash_search(pgHash, &searchKey, HASH_ENTER_NULL, &found);
		if (!found) {
			SpinLockInit(&entry->mutex);
			SpinLockAcquire(&entry->mutex);
			entry->millisecond = time;
			entry->sn = 1;
			SpinLockRelease(&entry->mutex);
			sn = 1;
		} else { /*永远不会执行*/
			SpinLockAcquire(&entry->mutex);
			sn = entry->sn;
			SpinLockRelease(&entry->mutex);
		}
	} else {
		SpinLockAcquire(&entry->mutex);
		if (time == entry->millisecond) {
			++entry->sn;
		} else {
			entry->millisecond = time;
			entry->sn = 1;
		}
		sn = entry->sn;
		SpinLockRelease(&entry->mutex);
	}
	LWLockRelease(pgState->lock);
	PG_RETURN_INT64(
		(time << timeOffset) |
		(((int64)machineId) << snBinLen) |
		(sn)
	);
}
Logo

鸿蒙生态一站式服务平台。

更多推荐