Skip to content
This repository has been archived by the owner on Jun 13, 2019. It is now read-only.

Commit

Permalink
Add Operator Cron jobs: SubmitBlock, ApplyDesposit
Browse files Browse the repository at this point in the history
Operator needs to have some cron jobs.
This commit adds two cron jobs that should be run by operator.
First one is a job that submits a block every certain time.
Second one is to move "apply deposit" job, which listens to
root chain deposit event, from ChildChain to operator cron job.
Reason for that is that we want ChildChain server to be scalable.
Current design would lead to single node restriction.

This commit does NOT include integration tests code change.
  • Loading branch information
boolafish authored and bun919tw committed Aug 26, 2018
1 parent 9537e20 commit 9578790
Show file tree
Hide file tree
Showing 31 changed files with 394 additions and 89 deletions.
2 changes: 2 additions & 0 deletions .isort.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[settings]
line_length = 100
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,17 @@ Deploy contract:
python deployment.py
```

Run child chain:
Run child chain Server:
```
python -m plasma_cash.child_chain
```

Run operator cron jobs:
(TODO: the following commands does not support running with cron job yet)
```
python -m plasma_cash.operator_cron_job
```

Client:
```
python
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/features/environment.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import signal
import time
from subprocess import Popen, PIPE
from subprocess import PIPE, Popen

from plasma_cash.root_chain.deployer import Deployer

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import rlp
import time

import rlp
from behave import given, then, when
from ethereum import utils

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/features/steps/challenge_history_flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import rlp
import time

import rlp
from behave import given, then, when

from integration_tests.features.utils import has_value
Expand Down
3 changes: 2 additions & 1 deletion plasma_cash/child_chain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def create_app(is_unit_test=False):
container.get_child_chain()

from . import server
app.register_blueprint(server.bp)
app.register_blueprint(server.api)
app.register_blueprint(server.operator, url_prefix='/operator')

return app
6 changes: 0 additions & 6 deletions plasma_cash/child_chain/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ class Block(rlp.Serializable):
]

def __init__(self, transaction_set=None):
# There's a weird bug that when using
# def __init__(self, transaction_set=[],...)
# `transaction_set` would sometimes NOT be an empty list
# this happens after calling `add_tx(tx)`
# whenever new a Block(), the transaction_set would not be empty
# as a result, use if None statement to enforce empty list instead
if transaction_set is None:
transaction_set = []

Expand Down
37 changes: 25 additions & 12 deletions plasma_cash/child_chain/child_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

from plasma_cash.utils.utils import get_sender

from .event import emit
from .block import Block
from .exceptions import (InvalidBlockNumException,
InvalidBlockSignatureException,
InvalidTxSignatureException,
from .event import emit
from .exceptions import (DepositAlreadyAppliedException, InvalidBlockNumException,
InvalidBlockSignatureException, InvalidTxSignatureException,
PreviousTxNotFoundException, TxAlreadySpentException,
TxAmountMismatchException, TxWithSameUidAlreadyExists)
from .transaction import Transaction
Expand All @@ -26,23 +25,37 @@ def __init__(self, authority, root_chain, db):
self.current_block = Block()
self.current_block_number = self.db.get_current_block_num()

# Register a filter for deposit event
"""
TODO: should be removed as there's operator cron job that's doing the job
here. Temperary keep this to not break integration test.
"""
deposit_filter = self.root_chain.eventFilter('Deposit', {'fromBlock': 0})
worker = Thread(target=self.log_loop, args=(deposit_filter, 0.1), daemon=True)
worker.start()

def log_loop(self, event_filter, poll_interval):
"""
TODO: should be removed as there's operator cron job that's doing the job
here. Temperary keep this to not break integration test.
"""
while True:
for event in event_filter.get_new_entries():
self.apply_deposit(event)
depositor = event['args']['depositor']
amount = event['args']['amount']
uid = event['args']['uid']
self.apply_deposit(depositor, amount, uid)
time.sleep(poll_interval)

def apply_deposit(self, event):
new_owner = utils.normalize_address(event['args']['depositor'])
amount = event['args']['amount']
uid = event['args']['uid']
deposit_tx = Transaction(0, uid, amount, new_owner)
self.current_block.add_tx(deposit_tx)
def apply_deposit(self, depositor, amount, uid):
new_owner = utils.normalize_address(depositor)

if not self.current_block.get_tx_by_uid(uid):
deposit_tx = Transaction(0, uid, amount, new_owner)
self.current_block.add_tx(deposit_tx)
return deposit_tx.hash

err_msg = 'deposit of uid: {} is already applied previously'.format(uid)
raise DepositAlreadyAppliedException(err_msg)

def submit_block(self, sig):
signature = bytes.fromhex(sig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ def get_proof(self, blknum, uid):
response = self.request(end_point, 'GET', params=params)
return response.text

def send_transaction(self, tx):
end_point = '/send_tx'
data = {'tx': tx}
self.request(end_point, 'POST', data=data)

def submit_block(self, sig):
end_point = '/submit_block'
end_point = '/operator/submit_block'
data = {'sig': sig}
self.request(end_point, 'POST', data=data)

def send_transaction(self, tx):
end_point = '/send_tx'
data = {'tx': tx}
def apply_deposit(self, depositor, amount, uid):
end_point = '/operator/apply_deposit'
data = {'depositor': depositor, 'amount': amount, 'uid': uid}
self.request(end_point, 'POST', data=data)
8 changes: 8 additions & 0 deletions plasma_cash/child_chain/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@ class InvalidBlockNumException(Exception):

class TxWithSameUidAlreadyExists(Exception):
"""the block already has one tx with the uid"""


class RequestFailedException(Exception):
"""request failed without success http status"""


class DepositAlreadyAppliedException(Exception):
"""the deposit is already applied"""
36 changes: 23 additions & 13 deletions plasma_cash/child_chain/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,39 @@

from flask import Blueprint, request

from plasma_cash.child_chain import websocket, event
from plasma_cash.child_chain import event, websocket
from plasma_cash.dependency_config import container

bp = Blueprint('api', __name__)
api = Blueprint('api', __name__)
operator = Blueprint('operator', __name__)

clients = {}


@bp.route('/block', methods=['GET'])
@api.route('/block', methods=['GET'])
def get_current_block():
return container.get_child_chain().get_current_block()


@bp.route('/block/<blknum>', methods=['GET'])
@api.route('/block/<blknum>', methods=['GET'])
def get_block(blknum):
return container.get_child_chain().get_block(int(blknum))


@bp.route('/proof', methods=['GET'])
@api.route('/proof', methods=['GET'])
def get_proof():
blknum = int(request.args.get('blknum'))
uid = int(request.args.get('uid'))
return container.get_child_chain().get_proof(blknum, uid)


@bp.route('/submit_block', methods=['POST'])
def submit_block():
sig = request.form['sig']
return container.get_child_chain().submit_block(sig)


@bp.route('/send_tx', methods=['POST'])
@api.route('/send_tx', methods=['POST'])
def send_tx():
tx = request.form['tx']
return container.get_child_chain().apply_transaction(tx)


@bp.route('/', methods=['GET'])
@api.route('/', methods=['GET'])
def root():
global clients

Expand All @@ -52,6 +48,20 @@ def root():
return ''


@operator.route('/submit_block', methods=['POST'])
def submit_block():
sig = request.form['sig']
return container.get_child_chain().submit_block(sig)


@operator.route('/apply_deposit', methods=['POST'])
def apply_deposit():
depositor = request.form['depositor']
amount = int(request.form['amount'])
uid = int(request.form['uid'])
return container.get_child_chain().apply_deposit(depositor, amount, uid)


@event.on('websocket.join')
def join(ws, arg):
clients[arg] = ws
Expand Down
4 changes: 0 additions & 4 deletions plasma_cash/client/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
class RequestFailedException(Exception):
"""request failed without success http status"""


class TxHistoryNotFoundException(Exception):
"""tx history is not found for specific block number"""
2 changes: 1 addition & 1 deletion plasma_cash/dependency_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from plasma_cash.child_chain.child_chain import ChildChain
from plasma_cash.child_chain.db.leveldb import LevelDb
from plasma_cash.child_chain.db.memory_db import MemoryDb
from plasma_cash.client.child_chain_client import ChildChainClient
from plasma_cash.child_chain.child_chain_client import ChildChainClient
from plasma_cash.config import PROJECT_DIR, db_config, plasma_config
from plasma_cash.root_chain.deployer import Deployer

Expand Down
Empty file.
28 changes: 28 additions & 0 deletions plasma_cash/operator_cron_job/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from plasma_cash.config import plasma_config
from plasma_cash.dependency_config import container

from .job_handler import JobHandler
from .jobs.apply_deposit_job import ApplyDepositJob
from .jobs.submit_block_job import SubmitBlockJob

SUBMIT_BLOCK_INTERVAL = 5
APPLY_DEPOSIT_INTERVAL = 1


def setup_job_handler(job_handler):
root_chain = container.get_root_chain()
child_chain_client = container.get_child_chain_client()

apply_deposit_job = ApplyDepositJob(root_chain, child_chain_client)
submit_block_job = SubmitBlockJob(child_chain_client, plasma_config['AUTHORITY_KEY'])

job_handler.add_job(submit_block_job, time_interval=SUBMIT_BLOCK_INTERVAL)
job_handler.add_job(apply_deposit_job, time_interval=APPLY_DEPOSIT_INTERVAL)

return job_handler


if __name__ == '__main__':
job_handler = JobHandler()
job_handler = setup_job_handler(job_handler)
job_handler.start()
20 changes: 20 additions & 0 deletions plasma_cash/operator_cron_job/job_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import time
from threading import Thread


class JobHandler(object):
def __init__(self):
self.workers = []

def add_job(self, job, time_interval):
worker = Thread(target=self._schedule_the_job, args=(job, time_interval,), daemon=True)
self.workers.append(worker)

def start(self):
for worker in self.workers:
worker.start()

def _schedule_the_job(self, job, time_interval):
while True:
job.run()
time.sleep(time_interval)
Empty file.
15 changes: 15 additions & 0 deletions plasma_cash/operator_cron_job/jobs/apply_deposit_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .job_interface import JobInterface


class ApplyDepositJob(JobInterface):

def __init__(self, root_chain, child_chain):
self.child_chain = child_chain
self.deposit_filter = root_chain.eventFilter('Deposit', {'fromBlock': 0})

def run(self):
for event in self.deposit_filter.get_new_entries():
depositor = event['args']['depositor']
amount = event['args']['amount']
uid = event['args']['uid']
self.child_chain.apply_deposit(depositor, amount, uid)
8 changes: 8 additions & 0 deletions plasma_cash/operator_cron_job/jobs/job_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import abc


class JobInterface(abc.ABC):

@abc.abstractmethod
def run(self):
return NotImplemented
23 changes: 23 additions & 0 deletions plasma_cash/operator_cron_job/jobs/submit_block_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import rlp
from ethereum import utils

from plasma_cash.child_chain.block import Block
from plasma_cash.utils.utils import sign

from .job_interface import JobInterface


class SubmitBlockJob(JobInterface):

def __init__(self, child_chain, key):
self.child_chain = child_chain
self.key = utils.normalize_key(key)

def run(self):
block = self._get_current_block()
sig = sign(block.hash, self.key)
self.child_chain.submit_block(sig.hex())

def _get_current_block(self):
block = self.child_chain.get_current_block()
return rlp.decode(utils.decode_hex(block), Block)
Loading

0 comments on commit 9578790

Please sign in to comment.