Go inheritance for Task handling
I am writing a Go slave program that listens on an AQMP message queue for tasks. These tasks contain input data and output data, json encoded.
Different tasks can have completely different input/output structures.
I am trying to find a way to write my registerTask function such that I can re-use that function for multiple tasks and encode JSON to the wanted Input structure. However, the JSON parser should throw an error if the input does not match the specific task input. I tried with accepting interface{}, but that allows anything and makes "foo" validate against any custom struct. Ideas?
This whole composition thing is new to me. In PHP, I would solve this with an abstract base class.
My code so far to register new Tasks:
func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
log.Printf("Sending response for %s: %v", task.GetName(), output)
jsonResponse, err := json.Marshal(output)
if err != nil {
log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
return err
}
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: jsonResponse,
}
return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}
func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
log.Println("Registering task: " + task.GetName())
deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
if err != nil {
log.Fatalf("Could not consume: %v", err)
}
go func() {
for taskMsg := range deliverChann {
log.Printf("Task received: %v", taskMsg)
// check if it's in UTF-8
if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
taskMsg.Reject(false)
}
// check if it's JSON
if strings.ToLower(taskMsg.ContentType) != "application/json" {
log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
taskMsg.Reject(false)
}
log.Printf("Decoding %v", taskMsg.Body)
if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
log.Printf("Could not decode task %s body: %v", task.GetName(), err)
taskMsg.Reject(false)
continue
}
log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
output, err := task.Process(input)
if err != nil {
log.Printf("Task '%s' error: %s", task.GetName(), err)
taskMsg.Reject(true)
continue
}
if err := sendResult(chann, task, output); err != nil {
log.Printf("Could not send result for task: " + task.GetName())
taskMsg.Reject(true)
continue
}
if err := taskMsg.Ack(false); err != nil {
log.Println("Could not acknowledge: " + err.Error())
}
}
}()
}
And my task code:
type TaskInput struct{}
type TaskOutput struct{}
type TaskData interface {
Input TaskInput `json:"input"`
Output TaskOutput `json:"output"`
}
type TaskImpl interface {
GetName() string
Process(msg *TaskData) (*TaskData, error)
}
type SampleTaskInput struct {
TaskInput
Arg string
}
type SampleTaskOutput struct {
TaskOutput
Result string
}
type SampleTask struct {
TaskImpl
}
func (SampleTask) GetName() string {
return "sample"
}
func (SampleTask) Process(msg *TaskData) (*TaskData, error){
log.Printf("Executing sampletask with arg: %v", msg.Input)
msg.Output = SampleTaskOutput{}
return SampleTaskOutput{
Result: "String was " + data.Arg,
}, nil
}
|
show 2 more comments
I am writing a Go slave program that listens on an AQMP message queue for tasks. These tasks contain input data and output data, json encoded.
Different tasks can have completely different input/output structures.
I am trying to find a way to write my registerTask function such that I can re-use that function for multiple tasks and encode JSON to the wanted Input structure. However, the JSON parser should throw an error if the input does not match the specific task input. I tried with accepting interface{}, but that allows anything and makes "foo" validate against any custom struct. Ideas?
This whole composition thing is new to me. In PHP, I would solve this with an abstract base class.
My code so far to register new Tasks:
func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
log.Printf("Sending response for %s: %v", task.GetName(), output)
jsonResponse, err := json.Marshal(output)
if err != nil {
log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
return err
}
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: jsonResponse,
}
return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}
func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
log.Println("Registering task: " + task.GetName())
deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
if err != nil {
log.Fatalf("Could not consume: %v", err)
}
go func() {
for taskMsg := range deliverChann {
log.Printf("Task received: %v", taskMsg)
// check if it's in UTF-8
if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
taskMsg.Reject(false)
}
// check if it's JSON
if strings.ToLower(taskMsg.ContentType) != "application/json" {
log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
taskMsg.Reject(false)
}
log.Printf("Decoding %v", taskMsg.Body)
if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
log.Printf("Could not decode task %s body: %v", task.GetName(), err)
taskMsg.Reject(false)
continue
}
log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
output, err := task.Process(input)
if err != nil {
log.Printf("Task '%s' error: %s", task.GetName(), err)
taskMsg.Reject(true)
continue
}
if err := sendResult(chann, task, output); err != nil {
log.Printf("Could not send result for task: " + task.GetName())
taskMsg.Reject(true)
continue
}
if err := taskMsg.Ack(false); err != nil {
log.Println("Could not acknowledge: " + err.Error())
}
}
}()
}
And my task code:
type TaskInput struct{}
type TaskOutput struct{}
type TaskData interface {
Input TaskInput `json:"input"`
Output TaskOutput `json:"output"`
}
type TaskImpl interface {
GetName() string
Process(msg *TaskData) (*TaskData, error)
}
type SampleTaskInput struct {
TaskInput
Arg string
}
type SampleTaskOutput struct {
TaskOutput
Result string
}
type SampleTask struct {
TaskImpl
}
func (SampleTask) GetName() string {
return "sample"
}
func (SampleTask) Process(msg *TaskData) (*TaskData, error){
log.Printf("Executing sampletask with arg: %v", msg.Input)
msg.Output = SampleTaskOutput{}
return SampleTaskOutput{
Result: "String was " + data.Arg,
}, nil
}
At the time the call toregisterTaskis made, do you know the type of the task? Or are you passing justnilas the second argument?
– mkopriva
Nov 23 '18 at 12:38
If you don't know the type of the task before reading the body, you'll have to first read the body, inspect the name, and then based on the name initialize the task implementation you need. For example something like this play.golang.com/p/le0iGku58iE
– mkopriva
Nov 23 '18 at 12:50
@mkopriva registerTask is called at initialization andchann.Consume(QUEUE_TODO, task.GetName()...will get the task todos. Every separate task type will launch a separate gofunc.
– nindustries
Nov 23 '18 at 14:48
In your example code your referencinginputon this lineoutput, err := task.Process(input), but it's not being declared anywhere, where is that coming from, what is it's type?
– mkopriva
Nov 23 '18 at 14:59
Ahh, my fault, that should be a reference to TaskData. I've been refactoring a lot to make this compile and break it again..
– nindustries
Nov 23 '18 at 15:00
|
show 2 more comments
I am writing a Go slave program that listens on an AQMP message queue for tasks. These tasks contain input data and output data, json encoded.
Different tasks can have completely different input/output structures.
I am trying to find a way to write my registerTask function such that I can re-use that function for multiple tasks and encode JSON to the wanted Input structure. However, the JSON parser should throw an error if the input does not match the specific task input. I tried with accepting interface{}, but that allows anything and makes "foo" validate against any custom struct. Ideas?
This whole composition thing is new to me. In PHP, I would solve this with an abstract base class.
My code so far to register new Tasks:
func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
log.Printf("Sending response for %s: %v", task.GetName(), output)
jsonResponse, err := json.Marshal(output)
if err != nil {
log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
return err
}
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: jsonResponse,
}
return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}
func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
log.Println("Registering task: " + task.GetName())
deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
if err != nil {
log.Fatalf("Could not consume: %v", err)
}
go func() {
for taskMsg := range deliverChann {
log.Printf("Task received: %v", taskMsg)
// check if it's in UTF-8
if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
taskMsg.Reject(false)
}
// check if it's JSON
if strings.ToLower(taskMsg.ContentType) != "application/json" {
log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
taskMsg.Reject(false)
}
log.Printf("Decoding %v", taskMsg.Body)
if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
log.Printf("Could not decode task %s body: %v", task.GetName(), err)
taskMsg.Reject(false)
continue
}
log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
output, err := task.Process(input)
if err != nil {
log.Printf("Task '%s' error: %s", task.GetName(), err)
taskMsg.Reject(true)
continue
}
if err := sendResult(chann, task, output); err != nil {
log.Printf("Could not send result for task: " + task.GetName())
taskMsg.Reject(true)
continue
}
if err := taskMsg.Ack(false); err != nil {
log.Println("Could not acknowledge: " + err.Error())
}
}
}()
}
And my task code:
type TaskInput struct{}
type TaskOutput struct{}
type TaskData interface {
Input TaskInput `json:"input"`
Output TaskOutput `json:"output"`
}
type TaskImpl interface {
GetName() string
Process(msg *TaskData) (*TaskData, error)
}
type SampleTaskInput struct {
TaskInput
Arg string
}
type SampleTaskOutput struct {
TaskOutput
Result string
}
type SampleTask struct {
TaskImpl
}
func (SampleTask) GetName() string {
return "sample"
}
func (SampleTask) Process(msg *TaskData) (*TaskData, error){
log.Printf("Executing sampletask with arg: %v", msg.Input)
msg.Output = SampleTaskOutput{}
return SampleTaskOutput{
Result: "String was " + data.Arg,
}, nil
}
I am writing a Go slave program that listens on an AQMP message queue for tasks. These tasks contain input data and output data, json encoded.
Different tasks can have completely different input/output structures.
I am trying to find a way to write my registerTask function such that I can re-use that function for multiple tasks and encode JSON to the wanted Input structure. However, the JSON parser should throw an error if the input does not match the specific task input. I tried with accepting interface{}, but that allows anything and makes "foo" validate against any custom struct. Ideas?
This whole composition thing is new to me. In PHP, I would solve this with an abstract base class.
My code so far to register new Tasks:
func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
log.Printf("Sending response for %s: %v", task.GetName(), output)
jsonResponse, err := json.Marshal(output)
if err != nil {
log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
return err
}
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: jsonResponse,
}
return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}
func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
log.Println("Registering task: " + task.GetName())
deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
if err != nil {
log.Fatalf("Could not consume: %v", err)
}
go func() {
for taskMsg := range deliverChann {
log.Printf("Task received: %v", taskMsg)
// check if it's in UTF-8
if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
taskMsg.Reject(false)
}
// check if it's JSON
if strings.ToLower(taskMsg.ContentType) != "application/json" {
log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
taskMsg.Reject(false)
}
log.Printf("Decoding %v", taskMsg.Body)
if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
log.Printf("Could not decode task %s body: %v", task.GetName(), err)
taskMsg.Reject(false)
continue
}
log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
output, err := task.Process(input)
if err != nil {
log.Printf("Task '%s' error: %s", task.GetName(), err)
taskMsg.Reject(true)
continue
}
if err := sendResult(chann, task, output); err != nil {
log.Printf("Could not send result for task: " + task.GetName())
taskMsg.Reject(true)
continue
}
if err := taskMsg.Ack(false); err != nil {
log.Println("Could not acknowledge: " + err.Error())
}
}
}()
}
And my task code:
type TaskInput struct{}
type TaskOutput struct{}
type TaskData interface {
Input TaskInput `json:"input"`
Output TaskOutput `json:"output"`
}
type TaskImpl interface {
GetName() string
Process(msg *TaskData) (*TaskData, error)
}
type SampleTaskInput struct {
TaskInput
Arg string
}
type SampleTaskOutput struct {
TaskOutput
Result string
}
type SampleTask struct {
TaskImpl
}
func (SampleTask) GetName() string {
return "sample"
}
func (SampleTask) Process(msg *TaskData) (*TaskData, error){
log.Printf("Executing sampletask with arg: %v", msg.Input)
msg.Output = SampleTaskOutput{}
return SampleTaskOutput{
Result: "String was " + data.Arg,
}, nil
}
edited Nov 23 '18 at 12:35
Flimzy
37.7k96496
37.7k96496
asked Nov 23 '18 at 12:25
nindustriesnindustries
62
62
At the time the call toregisterTaskis made, do you know the type of the task? Or are you passing justnilas the second argument?
– mkopriva
Nov 23 '18 at 12:38
If you don't know the type of the task before reading the body, you'll have to first read the body, inspect the name, and then based on the name initialize the task implementation you need. For example something like this play.golang.com/p/le0iGku58iE
– mkopriva
Nov 23 '18 at 12:50
@mkopriva registerTask is called at initialization andchann.Consume(QUEUE_TODO, task.GetName()...will get the task todos. Every separate task type will launch a separate gofunc.
– nindustries
Nov 23 '18 at 14:48
In your example code your referencinginputon this lineoutput, err := task.Process(input), but it's not being declared anywhere, where is that coming from, what is it's type?
– mkopriva
Nov 23 '18 at 14:59
Ahh, my fault, that should be a reference to TaskData. I've been refactoring a lot to make this compile and break it again..
– nindustries
Nov 23 '18 at 15:00
|
show 2 more comments
At the time the call toregisterTaskis made, do you know the type of the task? Or are you passing justnilas the second argument?
– mkopriva
Nov 23 '18 at 12:38
If you don't know the type of the task before reading the body, you'll have to first read the body, inspect the name, and then based on the name initialize the task implementation you need. For example something like this play.golang.com/p/le0iGku58iE
– mkopriva
Nov 23 '18 at 12:50
@mkopriva registerTask is called at initialization andchann.Consume(QUEUE_TODO, task.GetName()...will get the task todos. Every separate task type will launch a separate gofunc.
– nindustries
Nov 23 '18 at 14:48
In your example code your referencinginputon this lineoutput, err := task.Process(input), but it's not being declared anywhere, where is that coming from, what is it's type?
– mkopriva
Nov 23 '18 at 14:59
Ahh, my fault, that should be a reference to TaskData. I've been refactoring a lot to make this compile and break it again..
– nindustries
Nov 23 '18 at 15:00
At the time the call to
registerTask is made, do you know the type of the task? Or are you passing just nil as the second argument?– mkopriva
Nov 23 '18 at 12:38
At the time the call to
registerTask is made, do you know the type of the task? Or are you passing just nil as the second argument?– mkopriva
Nov 23 '18 at 12:38
If you don't know the type of the task before reading the body, you'll have to first read the body, inspect the name, and then based on the name initialize the task implementation you need. For example something like this play.golang.com/p/le0iGku58iE
– mkopriva
Nov 23 '18 at 12:50
If you don't know the type of the task before reading the body, you'll have to first read the body, inspect the name, and then based on the name initialize the task implementation you need. For example something like this play.golang.com/p/le0iGku58iE
– mkopriva
Nov 23 '18 at 12:50
@mkopriva registerTask is called at initialization and
chann.Consume(QUEUE_TODO, task.GetName()... will get the task todos. Every separate task type will launch a separate gofunc.– nindustries
Nov 23 '18 at 14:48
@mkopriva registerTask is called at initialization and
chann.Consume(QUEUE_TODO, task.GetName()... will get the task todos. Every separate task type will launch a separate gofunc.– nindustries
Nov 23 '18 at 14:48
In your example code your referencing
input on this line output, err := task.Process(input), but it's not being declared anywhere, where is that coming from, what is it's type?– mkopriva
Nov 23 '18 at 14:59
In your example code your referencing
input on this line output, err := task.Process(input), but it's not being declared anywhere, where is that coming from, what is it's type?– mkopriva
Nov 23 '18 at 14:59
Ahh, my fault, that should be a reference to TaskData. I've been refactoring a lot to make this compile and break it again..
– nindustries
Nov 23 '18 at 15:00
Ahh, my fault, that should be a reference to TaskData. I've been refactoring a lot to make this compile and break it again..
– nindustries
Nov 23 '18 at 15:00
|
show 2 more comments
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53446706%2fgo-inheritance-for-task-handling%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53446706%2fgo-inheritance-for-task-handling%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
At the time the call to
registerTaskis made, do you know the type of the task? Or are you passing justnilas the second argument?– mkopriva
Nov 23 '18 at 12:38
If you don't know the type of the task before reading the body, you'll have to first read the body, inspect the name, and then based on the name initialize the task implementation you need. For example something like this play.golang.com/p/le0iGku58iE
– mkopriva
Nov 23 '18 at 12:50
@mkopriva registerTask is called at initialization and
chann.Consume(QUEUE_TODO, task.GetName()...will get the task todos. Every separate task type will launch a separate gofunc.– nindustries
Nov 23 '18 at 14:48
In your example code your referencing
inputon this lineoutput, err := task.Process(input), but it's not being declared anywhere, where is that coming from, what is it's type?– mkopriva
Nov 23 '18 at 14:59
Ahh, my fault, that should be a reference to TaskData. I've been refactoring a lot to make this compile and break it again..
– nindustries
Nov 23 '18 at 15:00