PostgreSQL分布式id-雪花算法snowflake(2)
1 创建序列每个表都需要单独创建序列创建序列的目的是用于快速生成snowflakeId.每次将新生成的snowflakeId保存在序列中,然后用于计算新的snowflakeId.序列的命名规则:seq_“表名”_iddrop sequence if exists seq_table_id;create sequence if not exists seq_table_idas bigintincr
·
1 创建序列
每台主机每个表都需要单独创建序列
- 创建序列的目的是用于快速生成snowflakeId.每次将新生成的snowflakeId保存在序列中,然后用于计算新的snowflakeId.
- 序列的命名规则:seq_“机器编号”_“表名”_id
drop sequence if exists seq_1_table_id;
create sequence if not exists seq_1_table_id
as bigint
increment 1
minvalue 0
maxvalue 9223372036854775807
start 0
cache 1;
--重置序列
select setval('seq_1_table_id'::regclass,0);
2 创建生成snowflakeId函数
- generateId依赖序列
- generateId是公共函数,第一个参数为表的序列,第二个参数为机器编号(值范围为0-1023)
- 注意第一个参数要和第二个参数相匹配
- 例如机器号为2的主机, 序列名应该为"seq_2_table_id",机器id为2
2.1 创建生成snowflakeId函数(uninx时间戳)
- 因为使用的是uninx时间戳,最多可以使用至2039-09-07 23:47:35.551+08
drop function if exists generateId(regclass,int);
create or replace function generateId(regclass,imachineId int)
returns bigint
as $$
--4095 =0x0000000000000FFF
--4190208 =0x00000000003FF000
select
(case when (sn>>22)=milliseconds and ((sn&4190208)>>12)=$2 then
setval($1,(milliseconds<<22)|($2<<12)|((sn&4095))) --sn nextval时已经加1
else
setval($1,(milliseconds<<22)|($2<<12)|1)
end)
from (select nextval($1) as sn,(extract(epoch from clock_timestamp())*1000)::bigint as milliseconds) as tmp;
$$ language sql;
2.2 创建生成snowflakeId函数(时间偏移从2021-01-01开始)
- 时间偏移从2021-01-01开始,最多可以使用至2090-09-07 15:47:35.551+08
drop function if exists generateIdOffset(regclass,int);
create or replace function generateIdOffset(regclass,imachineId int)
returns bigint
as $$
--1609430400000 =2021-01-01 00:00:00.000000
--4095 =0x0000000000000FFF
--4190208 =0x00000000003FF000
select
(case when (sn>>22)=milliseconds and ((sn&4190208)>>12)=$2 then
setval($1,(milliseconds<<22)|($2<<12)|((sn&4095))) --sn nextval时已经加1
else
setval($1,(milliseconds<<22)|($2<<12)|1)
end)
from (select nextval($1) as sn,(extract(epoch from clock_timestamp())*1000)::bigint-1609430400000 as milliseconds) as tmp;
$$ language sql;
3 测试
snowflakeId生成速度约90个/毫秒
3.1 uninx时间戳
select
id,id>>22,(id&4190208)>>12,id&4095
from(
select generateId('public.seq_1_table_id'::regclass,1) as id from generate_series(1,10000)
) as tmp;
3.2 时间偏移从2021-01-01开始
select
id,1609430400000 +(id>>22),(id&4190208)>>12,id&4095
from(
select generateIdOffset('public.seq_1_table_id'::regclass,1) as id from generate_series(1,10000)
) as tmp;
4 C语言结合PG Hasmap实现
snowflakeId生成速度约700个/毫秒/每核,是使用序列的10倍.
以8核心cpu为生成snowflakeId为例: 500* 1000 * 8 = 4000000个/秒,取500个/毫秒/每核的中间值排除锁\cpu切换时间.
/**
* 根据表OID和机器编号生成新的ID
* 生成速度约为每核心每毫秒700个id
*
* iRelId:表注册的id, 示例:'public.paramtest'::regclass::oid
* iMachineId:机器编号, 值范围从1 - select (1<<iMachineBinLen)-1
* iMachineBinLen:机器编号使用的字节数, iMachineBinLen+iSnBinLen必须小于等于22
* iSnBinLen:序列号使用的字节数, iMachineBinLen+iSnBinLen必须小于等于22
* imillisecondOffset:从1970-01-01偏移多少毫秒,只能是正数,计算时要加上此偏移量
*
* drop function if exists generate_snowflake_v3(Oid,integer,integer,integer,bigint);
* drop function if exists generateSnowflakeV3(Oid,integer,integer,integer,bigint);
*/
create function generateSnowflakeV3(
iRelId Oid,
iMachineId integer,
iMachineBinLen integer default 10,
iSnBinLen integer default 12,
iMillisecondOffset bigint default 1577836800000
) returns bigint
as 'pg_kmcb', 'generateSnowflakeV3'
language C strict;
Datum generateSnowflakeV3(PG_FUNCTION_ARGS) {
bool found;
Oid relid;
int64 time, millisecondOffset;
struct SeHashKey searchKey;
struct SeHashEntry* entry = NULL;
struct timeval tp;
int32 machineId, timeOffset, machineBinLen, snBinLen, sn;
if (NULL == pgState || NULL == pgHash)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_kmb must be loaded via shared_preload_libraries")));
relid = PG_GETARG_OID(0);
machineId = PG_GETARG_INT32(1);
machineBinLen = PG_GETARG_INT32(2);
snBinLen = PG_GETARG_INT32(3);
millisecondOffset = PG_GETARG_INT64(4);
if (machineId < 1)
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("The parameter iMachineId must be greater than 1.")));
if (machineBinLen < 1)
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("The parameter iMachineBinLen must be greater than 1.")));
if (snBinLen < 1)
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("The parameter iSnBinLen must be greater than 1.")));
if (millisecondOffset < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("The parameter iMillisecondOffset must be greater than 0.")));
if (machineId > ((1 << machineBinLen) - 1))
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("The parameter iMachineId out of range.")));
timeOffset = (machineBinLen + snBinLen);
if (timeOffset != 22)
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("iMachineBinLen and iSnBinLen out of range(iMachineBinLen + iSnBinLen=22).")));
//获取时间
gettimeofday(&tp, NULL);
time = (
(((int64)tp.tv_sec) * INT64CONST(1000))
+ (((int64)tp.tv_usec) / INT64CONST(1000))
) - millisecondOffset;
//hash操作
searchKey.relid = relid;
searchKey.machineId = machineId;
LWLockAcquire(pgState->lock, LW_SHARED);
entry = (struct SeHashEntry*)hash_search(pgHash, &searchKey, HASH_FIND, NULL);
if (NULL == entry) {
LWLockRelease(pgState->lock);
LWLockAcquire(pgState->lock, LW_EXCLUSIVE);
entry = (struct SeHashEntry*)hash_search(pgHash, &searchKey, HASH_ENTER_NULL, &found);
if (!found) {
SpinLockInit(&entry->mutex);
SpinLockAcquire(&entry->mutex);
entry->millisecond = time;
entry->sn = 1;
SpinLockRelease(&entry->mutex);
sn = 1;
} else { /*永远不会执行*/
SpinLockAcquire(&entry->mutex);
sn = entry->sn;
SpinLockRelease(&entry->mutex);
}
} else {
SpinLockAcquire(&entry->mutex);
if (time == entry->millisecond) {
++entry->sn;
} else {
entry->millisecond = time;
entry->sn = 1;
}
sn = entry->sn;
SpinLockRelease(&entry->mutex);
}
LWLockRelease(pgState->lock);
PG_RETURN_INT64(
(time << timeOffset) |
(((int64)machineId) << snBinLen) |
(sn)
);
}
更多推荐
已为社区贡献1条内容
所有评论(0)