148 lines
3.5 KiB
Go
148 lines
3.5 KiB
Go
package etcd
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"path"
|
|
"sync"
|
|
|
|
"git.nobla.cn/golang/aeus/pkg/errors"
|
|
"git.nobla.cn/golang/aeus/registry"
|
|
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
type etcdRegistry struct {
|
|
prefix string
|
|
client *clientv3.Client
|
|
options *registry.RegistryOptions
|
|
leaseID clientv3.LeaseID
|
|
sync.RWMutex
|
|
}
|
|
|
|
func (e *etcdRegistry) prefixKey(name string) string {
|
|
return path.Join(e.prefix, name) + "/"
|
|
}
|
|
|
|
func (e *etcdRegistry) buildKey(s *registry.Service) string {
|
|
return path.Join(e.prefix, s.Name, s.ID)
|
|
}
|
|
|
|
func (e *etcdRegistry) marshalService(s *registry.Service) string {
|
|
buf, err := json.Marshal(s)
|
|
if err == nil {
|
|
return string(buf)
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (e *etcdRegistry) unmarshalService(b []byte) (*registry.Service, error) {
|
|
var service registry.Service
|
|
err := json.Unmarshal(b, &service)
|
|
if err == nil {
|
|
return &service, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
func (e *etcdRegistry) Name() string {
|
|
return Name
|
|
}
|
|
|
|
func (e *etcdRegistry) Init(opts ...registry.RegistryOption) (err error) {
|
|
for _, cb := range opts {
|
|
cb(e.options)
|
|
}
|
|
cfg := clientv3.Config{}
|
|
if e.options.Context != nil {
|
|
clientOpts := FromContext(e.options.Context)
|
|
if clientOpts != nil {
|
|
cfg.Username = clientOpts.Username
|
|
cfg.Password = clientOpts.Password
|
|
cfg.DialTimeout = clientOpts.DialTimeout
|
|
if clientOpts.Prefix != "" {
|
|
e.prefix = clientOpts.Prefix
|
|
}
|
|
}
|
|
}
|
|
if e.options.TLSConfig != nil {
|
|
cfg.TLS = e.options.TLSConfig
|
|
} else if e.options.Secure {
|
|
cfg.TLS = &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
}
|
|
}
|
|
if e.prefix == "" {
|
|
e.prefix = "/micro/service/"
|
|
}
|
|
cfg.Endpoints = append(cfg.Endpoints, e.options.Addrs...)
|
|
e.client, err = clientv3.New(cfg)
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Register(s *registry.Service, cbs ...registry.RegisterOption) (err error) {
|
|
var lgr *clientv3.LeaseGrantResponse
|
|
opts := registry.NewRegisterOption(cbs...)
|
|
if opts.TTL.Seconds() > 0 {
|
|
if e.leaseID > 0 {
|
|
if _, err = e.client.KeepAliveOnce(opts.Context, e.leaseID); err == nil {
|
|
return
|
|
} else {
|
|
if err != rpctypes.ErrLeaseNotFound {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
lgr, err = e.client.Grant(opts.Context, int64(opts.TTL.Seconds()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if lgr != nil {
|
|
if _, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s), clientv3.WithLease(lgr.ID)); err == nil {
|
|
e.leaseID = lgr.ID
|
|
}
|
|
} else {
|
|
_, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s))
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Deregister(s *registry.Service, cbs ...registry.DeregisterOption) error {
|
|
opts := registry.NewDeregisterOption(cbs...)
|
|
e.client.Delete(opts.Context, e.buildKey(s))
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) GetService(name string, cbs ...registry.GetOption) ([]*registry.Service, error) {
|
|
opts := registry.NewGetOption(cbs...)
|
|
rsp, err := e.client.Get(opts.Context, e.prefixKey(name), clientv3.WithPrefix(), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(rsp.Kvs) == 0 {
|
|
return nil, errors.ErrNotFound
|
|
}
|
|
ss := make([]*registry.Service, 0, len(rsp.Kvs))
|
|
for _, n := range rsp.Kvs {
|
|
if s, err := e.unmarshalService(n.Value); err == nil {
|
|
ss = append(ss, s)
|
|
}
|
|
}
|
|
return ss, nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
return newWatcher(e, opts...)
|
|
}
|
|
|
|
func NewEtcdRegistry() registry.Registry {
|
|
e := &etcdRegistry{
|
|
options: ®istry.RegistryOptions{},
|
|
}
|
|
return e
|
|
}
|