Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SUSPENDED: (PLAT-845) bootstrap and consensus uniproto, new branch #866

Open
wants to merge 18 commits into
base: network-transports
Choose a base branch
from
5 changes: 2 additions & 3 deletions ledger-core/cmd/pulsard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/insolar/assured-ledger/ledger-core/log/global"
"github.com/insolar/assured-ledger/ledger-core/metrics"
"github.com/insolar/assured-ledger/ledger-core/network/pulsenetwork"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
"github.com/insolar/assured-ledger/ledger-core/pulsar"
"github.com/insolar/assured-ledger/ledger-core/pulsar/entropygenerator"
"github.com/insolar/assured-ledger/ledger-core/pulse"
Expand Down Expand Up @@ -122,15 +121,15 @@ func initPulsar(ctx context.Context, cfg configuration.PulsarConfiguration) (*co
cryptographyService := platformpolicy.NewCryptographyService()
keyProcessor := platformpolicy.NewKeyProcessor()

pulseDistributor, err := pulsenetwork.NewDistributor(cfg.Pulsar.PulseDistributor)
pulseDistributor, err := pulsenetwork.NewDistributor(cfg.Pulsar.PulseDistributor, pulsenetwork.NewPulsarUniserver())
if err != nil {
panic(err)
}

cm := component.NewManager(nil)
cm.SetLogger(global.Logger())

cm.Register(cryptographyScheme, keyStore, keyProcessor, transport.NewFactory(cfg.Pulsar.DistributionTransport))
cm.Register(cryptographyScheme, keyStore, keyProcessor)
cm.Inject(cryptographyService, pulseDistributor)

if err = cm.Init(ctx); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions ledger-core/cmd/testpulsard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/insolar/assured-ledger/ledger-core/instrumentation/inslogger"
"github.com/insolar/assured-ledger/ledger-core/log/global"
"github.com/insolar/assured-ledger/ledger-core/network/pulsenetwork"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
"github.com/insolar/assured-ledger/ledger-core/pulsar"
"github.com/insolar/assured-ledger/ledger-core/pulsar/entropygenerator"
"github.com/insolar/assured-ledger/ledger-core/version"
Expand Down Expand Up @@ -99,15 +98,15 @@ func initPulsar(ctx context.Context, cfg configuration.PulsarConfiguration) *pul
cryptographyService := platformpolicy.NewCryptographyService()
keyProcessor := platformpolicy.NewKeyProcessor()

pulseDistributor, err := pulsenetwork.NewDistributor(cfg.Pulsar.PulseDistributor)
pulseDistributor, err := pulsenetwork.NewDistributor(cfg.Pulsar.PulseDistributor, pulsenetwork.NewPulsarUniserver())
if err != nil {
panic(err)
}

cm := component.NewManager(nil)
cm.SetLogger(global.Logger())

cm.Register(cryptographyScheme, keyStore, keyProcessor, transport.NewFactory(cfg.Pulsar.DistributionTransport))
cm.Register(cryptographyScheme, keyStore, keyProcessor)
cm.Inject(cryptographyService, pulseDistributor)

if err = cm.Init(ctx); err != nil {
Expand Down
50 changes: 50 additions & 0 deletions ledger-core/network/consensus/adapters/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/insolar/assured-ledger/ledger-core/instrumentation/inslogger"
"github.com/insolar/assured-ledger/ledger-core/log"
"github.com/insolar/assured-ledger/ledger-core/network/consensus/gcpv2/core/errors"
"github.com/insolar/assured-ledger/ledger-core/network/nds/uniproto"
"github.com/insolar/assured-ledger/ledger-core/network/nwapi"
"github.com/insolar/assured-ledger/ledger-core/pulse"
"github.com/insolar/assured-ledger/ledger-core/vanilla/iokit"
"github.com/insolar/assured-ledger/ledger-core/vanilla/throw"

"github.com/insolar/assured-ledger/ledger-core/instrumentation/insmetrics"
"github.com/insolar/assured-ledger/ledger-core/network/consensus/common/warning"
Expand Down Expand Up @@ -137,3 +142,48 @@ func (dh *DatagramHandler) HandleDatagram(ctx context.Context, address string, b

dh.packetHandler.handlePacket(ctx, packetParser, address)
}

var _ uniproto.Controller = &ConsensusProtocolMarshaller{}
var _ uniproto.Receiver = &ConsensusProtocolMarshaller{}

type ConsensusProtocolMarshaller struct {
HandlerAdapter *DatagramHandler
}

func (p *ConsensusProtocolMarshaller) Start(manager uniproto.PeerManager) {}
func (p *ConsensusProtocolMarshaller) NextPulse(p2 pulse.Range) {}
func (p *ConsensusProtocolMarshaller) Stop() {}

func (p *ConsensusProtocolMarshaller) PrepareHeader(_ *uniproto.Header, pn pulse.Number) (pulse.Number, error) {
return pn, nil
}

func (p *ConsensusProtocolMarshaller) VerifyHeader(*uniproto.Header, pulse.Number) error {
return nil
}

func (p *ConsensusProtocolMarshaller) ReceiveSmallPacket(packet *uniproto.ReceivedPacket, b []byte) {
p.HandlerAdapter.HandleDatagram(context.Background(), packet.From.String(), b[packet.GetPayloadOffset():len(b)-packet.GetSignatureSize()])
}

// nolint
func (p *ConsensusProtocolMarshaller) ReceiveLargePacket(packet *uniproto.ReceivedPacket, preRead []byte, r io.LimitedReader) error {
panic("ConsensusProtocolMarshaller unsupported ReceiveLargePacket")
}

func (p *ConsensusProtocolMarshaller) SerializeMsg(pt uniproto.ProtocolType, pkt uint8, pn pulse.Number, msg string) []byte {
packet := uniproto.NewSendingPacket(nil, nil)
packet.Header.SetProtocolType(pt)
packet.Header.SetPacketType(pkt)
packet.Header.SetRelayRestricted(true)
packet.PulseNumber = pn

b, err := packet.SerializeToBytes(uint(len(msg)), func(_ nwapi.SerializationContext, _ *uniproto.Packet, w *iokit.LimitedWriter) error {
_, err := w.Write([]byte(msg))
return err
})
if err != nil {
panic(throw.ErrorWithStack(err))
}
return b
}
36 changes: 36 additions & 0 deletions ledger-core/network/consensus/adapters/preparer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package adapters

import (
"io"

"github.com/insolar/assured-ledger/ledger-core/network/nds/uniproto"
"github.com/insolar/assured-ledger/ledger-core/network/nwapi"
"github.com/insolar/assured-ledger/ledger-core/pulse"
"github.com/insolar/assured-ledger/ledger-core/vanilla/iokit"
)

var _ uniproto.ProtocolPacket = &ConsensusPacket{}

type ConsensusPacket struct {
Payload []byte
}

func (p *ConsensusPacket) PreparePacket() (uniproto.PacketTemplate, uint, uniproto.PayloadSerializerFunc) {
pt := uniproto.PacketTemplate{}
pt.Header.SetRelayRestricted(true)
pt.Header.SetProtocolType(uniproto.ProtocolTypePulsar)
pt.PulseNumber = pulse.MinTimePulse
return pt, uint(len(p.Payload)), p.SerializePayload
}

func (p *ConsensusPacket) SerializePayload(_ nwapi.SerializationContext, _ *uniproto.Packet, writer *iokit.LimitedWriter) error {
_, err := writer.Write(p.Payload)
return err
}

func (p *ConsensusPacket) DeserializePayload(_ nwapi.DeserializationContext, _ *uniproto.Packet, reader *iokit.LimitedReader) error {
b := make([]byte, reader.RemainingBytes())
_, err := io.ReadFull(reader, b)
p.Payload = b
return err
}
7 changes: 3 additions & 4 deletions ledger-core/network/consensus/adapters/tests/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/insolar/assured-ledger/ledger-core/network/gateway"
"github.com/insolar/assured-ledger/ledger-core/network/mandates"
"github.com/insolar/assured-ledger/ledger-core/network/nodeinfo"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
"github.com/insolar/assured-ledger/ledger-core/testutils/gen"
"github.com/insolar/assured-ledger/ledger-core/testutils/network/mutable"
"github.com/insolar/assured-ledger/ledger-core/vanilla/cryptkit"
Expand Down Expand Up @@ -146,9 +145,9 @@ func initNodes(ctx context.Context, mode consensus.Mode, nodes GeneratedNodes, s
ns.transports[i] = delayTransport

controller := consensus.New(ctx, consensus.Dep{
KeyProcessor: keyProcessor,
CertificateManager: certificateManager,
KeyStore: keystore.NewInplaceKeyStore(nodes.meta[i].privateKey),
KeyProcessor: keyProcessor,
CertificateManager: certificateManager,
KeyStore: keystore.NewInplaceKeyStore(nodes.meta[i].privateKey),
TransportCryptography: adapters.NewTransportCryptographyFactory(scheme),

NodeKeeper: nodeKeeper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/insolar/assured-ledger/ledger-core/instrumentation/inslogger"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
)

type NetStrategy interface {
Expand Down
1 change: 0 additions & 1 deletion ledger-core/network/consensus/adapters/tests/pulse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/insolar/assured-ledger/ledger-core/cryptography/platformpolicy"
"github.com/insolar/assured-ledger/ledger-core/network/consensus/serialization"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
"github.com/insolar/assured-ledger/ledger-core/pulse"
"github.com/insolar/assured-ledger/ledger-core/vanilla/cryptkit"
"github.com/insolar/assured-ledger/ledger-core/vanilla/longbits"
Expand Down
27 changes: 19 additions & 8 deletions ledger-core/network/consensus/adapters/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,41 @@ package adapters
import (
"context"

transport2 "github.com/insolar/assured-ledger/ledger-core/network/consensus/gcpv2/api/transport"
"github.com/insolar/assured-ledger/ledger-core/network/consensus/gcpv2/api/transport"
"github.com/insolar/assured-ledger/ledger-core/network/nds/uniproto"
"github.com/insolar/assured-ledger/ledger-core/network/nds/uniproto/l2/uniserver"
"github.com/insolar/assured-ledger/ledger-core/network/nwapi"

"github.com/insolar/assured-ledger/ledger-core/instrumentation/inslogger"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
)

type PacketSender struct {
datagramTransport transport.DatagramTransport
unifiedServer *uniserver.UnifiedServer
}

func NewPacketSender(datagramTransport transport.DatagramTransport) *PacketSender {
func NewPacketSender(unifiedServer *uniserver.UnifiedServer) *PacketSender {
return &PacketSender{
datagramTransport: datagramTransport,
unifiedServer: unifiedServer,
}
}

func (ps *PacketSender) SendPacketToTransport(ctx context.Context, to transport2.TargetProfile, sendOptions transport2.PacketSendOptions, payload interface{}) {
func (ps *PacketSender) SendPacketToTransport(ctx context.Context, to transport.TargetProfile, sendOptions transport.PacketSendOptions, payload interface{}) {
addr := to.GetStatic().GetDefaultEndpoint().GetIPAddress().String()

ctx, logger := inslogger.WithFields(ctx, map[string]interface{}{
_, logger := inslogger.WithFields(ctx, map[string]interface{}{
"receiver_addr": addr,
})

err := ps.datagramTransport.SendDatagram(ctx, addr, payload.([]byte))
// nwapi.NewHostAndPort()
peerAddr := nwapi.NewHostPort(addr, false)

peer, err := ps.unifiedServer.PeerManager().Manager().ConnectPeer(peerAddr)
if err != nil {
logger.Error("Failed to connect to peer: ", err)
}

packet := &ConsensusPacket{Payload: payload.([]byte)}
err = peer.SendPacket(uniproto.SessionlessNoQuota, packet)
if err != nil {
logger.Error("Failed to send datagram: ", err)
}
Expand Down
10 changes: 5 additions & 5 deletions ledger-core/network/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/insolar/assured-ledger/ledger-core/network/consensus/gcpv2/core"
"github.com/insolar/assured-ledger/ledger-core/network/consensus/gcpv2/core/coreapi"
"github.com/insolar/assured-ledger/ledger-core/network/consensus/serialization"
"github.com/insolar/assured-ledger/ledger-core/network/nds/uniproto/l2/uniserver"
"github.com/insolar/assured-ledger/ledger-core/network/nodeinfo"
"github.com/insolar/assured-ledger/ledger-core/network/transport"
"github.com/insolar/assured-ledger/ledger-core/vanilla/cryptkit"
"github.com/insolar/assured-ledger/ledger-core/vanilla/longbits"
)
Expand Down Expand Up @@ -71,15 +71,15 @@ type Dep struct {
KeyStore cryptography.KeyStore
TransportCryptography transport2.CryptographyAssistant

NodeKeeper beat.NodeKeeper
DatagramTransport transport.DatagramTransport
NodeKeeper beat.NodeKeeper
UnifiedServer *uniserver.UnifiedServer

StateGetter adapters.NodeStater
PulseChanger adapters.BeatChanger
StateUpdater adapters.StateUpdater
EphemeralController adapters.EphemeralController

LocalNodeProfile profiles.StaticProfile
LocalNodeProfile profiles.StaticProfile
}

func (cd *Dep) verify() {
Expand Down Expand Up @@ -134,7 +134,7 @@ func newConstructor(ctx context.Context, dep *Dep) *constructor {
c.transportCryptographyFactory,
c.localNodeConfiguration,
)
c.packetSender = adapters.NewPacketSender(dep.DatagramTransport)
c.packetSender = adapters.NewPacketSender(dep.UnifiedServer)
c.transportFactory = adapters.NewTransportFactory(
c.transportCryptographyFactory,
c.packetBuilder,
Expand Down
Loading